use super::feeder::FeederState;
use super::memory::{self, MemoryGuard, PressureLevel};
use crate::storage::blockstore::BlockStore;
use crate::storage::disk_utxo::{
block_input_keys_batch_into_arc, block_input_keys_into_filtered,
block_input_keys_into_filtered_with_tx_ids, key_to_outpoint, outpoint_to_key, OutPointKey,
};
use crate::storage::ibd_utxo_store::{IbdUtxoStore, PendingFlushPackage};
use crate::storage::Storage;
use anyhow::Result;
use blvm_protocol::bip_validation::Bip30Index;
use blvm_protocol::{
segwit::Witness, BitcoinProtocolEngine, Block, BlockHeader, Hash, UtxoSet, UTXO,
};
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info, warn};
thread_local! {
static EMPTY_WITNESS_STACKS: RefCell<FxHashMap<usize, Arc<Vec<Vec<Witness>>>>> =
RefCell::new(FxHashMap::default());
}
fn shared_empty_witness_stacks(n_tx: usize) -> Arc<Vec<Vec<Witness>>> {
EMPTY_WITNESS_STACKS.with(|cell| {
let mut g = cell.borrow_mut();
if let Some(a) = g.get(&n_tx) {
return Arc::clone(a);
}
let arc = Arc::new(vec![Vec::new(); n_tx]);
if g.len() > 512 {
g.clear();
}
g.insert(n_tx, Arc::clone(&arc));
arc
})
}
static LAST_IBD_HEAP_TRIM_WALL_MS: AtomicU64 = AtomicU64::new(0);
const IBD_HEAP_TRIM_MIN_INTERVAL_MS: u64 = 2_000;
static IBD_EMERGENCY_EVICT_BLOCKS_SEEN: AtomicU64 = AtomicU64::new(0);
const IBD_EMERGENCY_EVICT_EVERY_N_BLOCKS: u64 = 8;
const IBD_EMERGENCY_EVICT_MIN_UNPROTECTED: usize = 32_768;
fn ibd_maybe_heap_trim() {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
loop {
let prev = LAST_IBD_HEAP_TRIM_WALL_MS.load(Ordering::Relaxed);
if now_ms.saturating_sub(prev) < IBD_HEAP_TRIM_MIN_INTERVAL_MS {
return;
}
if LAST_IBD_HEAP_TRIM_WALL_MS
.compare_exchange_weak(prev, now_ms, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
#[cfg(all(not(target_os = "windows"), feature = "mimalloc"))]
unsafe {
libmimalloc_sys::mi_collect(true);
}
#[cfg(target_os = "linux")]
unsafe {
libc::malloc_trim(0);
}
}
use super::ibd_staging::empty_utxo_delta;
use super::ParallelIBD;
use blvm_protocol::block::UtxoDelta;
#[allow(clippy::too_many_arguments)]
pub(crate) fn ibd_v2_retire_apply_utxo_delta(
next_height: u64,
store: &IbdUtxoStore,
blocks_buf: &[Arc<Block>],
keys_buf: &mut Vec<OutPointKey>,
keys_seen: &mut rustc_hash::FxHashSet<OutPointKey>,
evict_scratch: &mut Vec<(OutPointKey, u64)>,
mem_guard: &mut MemoryGuard,
max_ahead_live: &Arc<AtomicU64>,
nominal_max_ahead: u64,
ibd_defer_flush: bool,
ibd_defer_checkpoint: u64,
) -> (u64, u64, Option<PendingFlushPackage>, bool) {
const EVICT_INTERVAL_BLOCKS: u64 = 16;
if next_height % EVICT_INTERVAL_BLOCKS == 0 {
store.maybe_evict(evict_scratch);
}
#[cfg(feature = "profile")]
let mut protect_evict_ms: u64 = 0;
#[cfg(not(feature = "profile"))]
let protect_evict_ms: u64 = 0;
if store.is_dynamic_eviction() {
#[cfg(feature = "profile")]
let t_protect_evict = std::time::Instant::now();
block_input_keys_batch_into_arc(blocks_buf, keys_buf, keys_seen);
store.protect_keys_for_next_blocks(keys_buf);
store.evict_if_needed(next_height);
#[cfg(feature = "profile")]
{
protect_evict_ms = t_protect_evict.elapsed().as_millis() as u64;
}
}
let pressure_level = mem_guard.should_flush(Some((max_ahead_live, nominal_max_ahead)));
memory::publish_ibd_pressure(pressure_level);
if let Some(new_cap) = mem_guard.compute_adaptive_cache_cap() {
let old_len = store.len();
store.tune_max_entries_for_pressure(new_cap, next_height);
let evicted = old_len.saturating_sub(store.len());
if evicted > 32_768 {
#[cfg(all(not(target_os = "windows"), feature = "mimalloc"))]
unsafe {
libmimalloc_sys::mi_collect(true);
}
#[cfg(target_os = "linux")]
unsafe {
libc::malloc_trim(0);
}
LAST_IBD_HEAP_TRIM_WALL_MS.store(0, Ordering::Relaxed);
}
}
let rss_pressure = pressure_level >= PressureLevel::Critical;
let rss_pressure_elevated_only = pressure_level == PressureLevel::Elevated;
if rss_pressure {
let pending_now = store.pending_len();
info!(
"[IBD_V2] height={} RSS pressure ({:?}, cache={}, pending={}), forcing flush",
next_height,
pressure_level,
store.len(),
pending_now
);
if pressure_level == PressureLevel::Emergency {
let n = IBD_EMERGENCY_EVICT_BLOCKS_SEEN.fetch_add(1, Ordering::Relaxed);
if n % IBD_EMERGENCY_EVICT_EVERY_N_BLOCKS == 0 {
let cache_now = store.len();
let protected_now = store.protected_len();
let evictable = cache_now.saturating_sub(protected_now);
if evictable >= IBD_EMERGENCY_EVICT_MIN_UNPROTECTED {
store.evict_aggressive_for_rss();
}
}
}
let pending_now = store.pending_len();
const CRITICAL_MIN_FLUSH_OPS: usize = 1_000;
let should_force =
pressure_level == PressureLevel::Emergency || pending_now >= CRITICAL_MIN_FLUSH_OPS;
let batch = if should_force {
store.take_flush_batch_force()
} else {
store.maybe_take_flush_batch()
};
ibd_maybe_heap_trim();
(0u64, protect_evict_ms, batch, true)
} else if rss_pressure_elevated_only {
let batch = store.maybe_take_flush_batch();
(0u64, protect_evict_ms, batch, false)
} else if ibd_defer_flush {
let at_checkpoint = next_height > 0 && next_height % ibd_defer_checkpoint == 0;
let batch = if at_checkpoint {
store.take_flush_batch_force()
} else {
None
};
(0u64, protect_evict_ms, batch, false)
} else {
let batch = store.maybe_take_flush_batch();
(0u64, protect_evict_ms, batch, false)
}
}
#[cfg(feature = "profile")]
#[inline]
fn ibd_profile_height_matches_sample(sample: u64, height: u64) -> bool {
sample == 1 || (sample > 0 && height % sample == 0)
}
#[inline]
fn dynamic_utxo_cap(level: PressureLevel, nominal: usize) -> usize {
if nominal == usize::MAX {
return usize::MAX;
}
match level {
PressureLevel::Emergency => (nominal / 4).max(8_192),
PressureLevel::Critical => (nominal * 2 / 3).max(nominal / 2),
PressureLevel::Elevated => (nominal * 9 / 10).max(nominal * 4 / 5),
PressureLevel::None => nominal,
}
}
#[inline]
fn dynamic_prefetch_lookahead(level: PressureLevel, nominal: usize) -> usize {
let n = nominal.clamp(1, 128);
match level {
PressureLevel::Emergency => 8,
PressureLevel::Critical => (n / 2).clamp(12, 48),
PressureLevel::Elevated => ((n * 2 / 3).max(24)).min(n),
PressureLevel::None => n,
}
}
pub(crate) struct IbdRetireWork {
pub(crate) height: u64,
pub(crate) blocks_buf: Vec<Arc<Block>>,
pub(crate) block: Arc<Block>,
}
fn retire_flush_batch_size() -> usize {
std::env::var("BLVM_IBD_RETIRE_FLUSH_BATCH")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n >= 1)
.unwrap_or(8)
}
fn push_utxo_flush_from_retire(
store: &Arc<IbdUtxoStore>,
storage_wm: &Arc<Storage>,
utxo_flush_handles: &Arc<Mutex<VecDeque<JoinHandle<Result<()>>>>>,
retire_flush_counter: &Arc<AtomicUsize>,
next_height: u64,
max_utxo_flushes_in_flight: usize,
pkg: PendingFlushPackage,
ibd_muhash: &Arc<Mutex<blvm_muhash::MuHash3072>>,
) -> Result<()> {
let flush_limit = memory::utxo_flush_concurrency_cap(max_utxo_flushes_in_flight).max(1);
let batch_count = retire_flush_batch_size();
let n = retire_flush_counter.fetch_add(1, Ordering::Relaxed);
let do_durability = batch_count <= 1 || n % batch_count == 0;
let mut q = utxo_flush_handles.lock();
while q.len() >= flush_limit {
let Some(handle) = q.pop_front() else {
return Err(anyhow::anyhow!(
"IBD invariant violated: UTXO flush wait queue empty under backpressure"
));
};
match handle.join() {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!("UTXO flush panicked: {:?}", e));
}
}
}
let batch_size = pkg.ops.len();
let heights = Arc::clone(&pkg.heights);
if do_durability {
while let Some(handle) = q.pop_front() {
match handle.join() {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!(
"UTXO flush panicked while draining for durability: {:?}",
e
));
}
}
}
drop(q);
let prepared = pkg.prepare_for_disk()?;
let muhash_running = {
let mut mh_guard = ibd_muhash.lock();
store.flush_prepared_package(&prepared, Some(&mut *mh_guard))?;
mh_guard.serialize_running_state()
};
store.flush_disk()?;
storage_wm
.chain()
.persist_ibd_utxo_flush_checkpoint(prepared.max_block_height, &muhash_running)?;
store.release_protected_heights(&heights);
store.note_utxo_flush_completed(prepared.max_block_height);
debug!(
"[IBD_DEBUG] Block {}: durability flush boundary (batch_size={}, n={})",
next_height, batch_size, n,
);
} else {
let store_clone = Arc::clone(store);
let mh_acc = Arc::clone(ibd_muhash);
q.push_back(std::thread::spawn(move || {
let prepared = pkg.prepare_for_disk()?;
{
let mut mh_guard = mh_acc.lock();
store_clone.flush_prepared_package(&prepared, Some(&mut *mh_guard))?;
}
store_clone.release_protected_heights(&heights);
store_clone.note_utxo_flush_completed(prepared.max_block_height);
Ok(())
}));
debug!(
"[IBD_DEBUG] Block {}: async commit (batch_size={}, in_flight={}, n={})",
next_height,
batch_size,
q.len(),
n,
);
}
Ok(())
}
fn retire_thread_shutdown(
retire_dispatcher: &mut super::retire_dispatcher::RetireDispatcher,
retire_err: &Arc<Mutex<Option<anyhow::Error>>>,
) -> Result<()> {
retire_dispatcher.shutdown_and_join()?;
if let Some(e) = retire_err.lock().take() {
return Err(e);
}
Ok(())
}
struct ValidateJob {
height: u64,
block_arc: Arc<Block>,
witnesses_storage: Arc<Vec<Vec<Witness>>>,
bip30_index: Bip30Index,
recent_headers: Vec<Arc<BlockHeader>>,
tx_ids: Vec<Hash>,
cached_network_time: u64,
keys: Vec<OutPointKey>,
spec_adds_snapshot: Vec<(u64, Arc<UtxoSet>)>,
prefetched: rustc_hash::FxHashMap<OutPointKey, Arc<UTXO>>,
}
struct ValidateResult {
height: u64,
result: Result<Option<UtxoDelta>>,
bip30_post: Bip30Index,
elapsed: std::time::Duration,
view_build_ms: u64,
}
struct InFlightEntry {
height: u64,
block_arc: Arc<Block>,
witnesses_storage: Arc<Vec<Vec<Witness>>>,
feeder_est_bytes: usize,
utxo_base_ms: u64,
utxo_base_tune_ms: u64,
prefetch_ms: u64,
apply_pending_ms: u64,
input_keys: Option<Vec<OutPointKey>>,
}
#[allow(clippy::too_many_arguments)]
fn run_validation_worker_shared(
rx: crossbeam_channel::Receiver<ValidateJob>,
tx: mpsc::Sender<ValidateResult>,
parallel_ibd: Arc<super::ParallelIBD>,
blockstore: Arc<crate::storage::blockstore::BlockStore>,
protocol: Arc<blvm_protocol::BitcoinProtocolEngine>,
store: Arc<IbdUtxoStore>,
last_retired: Arc<AtomicU64>,
max_pending_ops: Arc<AtomicUsize>,
) {
let mut utxo_base: UtxoSet = UtxoSet::default();
let mut supplement_cache_buf: Vec<OutPointKey> = Vec::new();
let mut keys_missing_buf: Vec<OutPointKey> = Vec::new();
let mut del_scratch: Vec<OutPointKey> = Vec::new();
let mut add_scratch: Vec<(OutPointKey, Arc<UTXO>)> = Vec::new();
loop {
let mut job = match rx.recv() {
Ok(j) => j,
Err(_) => break,
};
let height = job.height;
let t_view = std::time::Instant::now();
utxo_base.clear();
utxo_base.reserve(job.keys.len());
let still_missing = &mut keys_missing_buf;
still_missing.clear();
for k in job.keys.iter() {
let op = key_to_outpoint(k);
if let Some(arc) = job.prefetched.get(k) {
utxo_base.insert(op, Arc::clone(arc));
} else {
still_missing.push(*k);
}
}
if !still_missing.is_empty() {
still_missing.retain(|k| {
if let Some(ref r) = store.cache_get(k) {
let op = key_to_outpoint(k);
utxo_base.insert(op, Arc::clone(&r.utxo));
return false;
}
true
});
}
if !still_missing.is_empty() && !job.spec_adds_snapshot.is_empty() {
still_missing.retain(|k| {
let op = key_to_outpoint(k);
for (_sh, set) in job.spec_adds_snapshot.iter().rev() {
if let Some(u) = set.get(&op) {
utxo_base.insert(op, Arc::clone(u));
return false;
}
}
true
});
}
if !still_missing.is_empty() {
store.supplement_utxo_map_with_buf(
&mut utxo_base,
still_missing,
&mut supplement_cache_buf,
);
}
let view_build_ms = t_view.elapsed().as_millis() as u64;
let recent_opt: Option<&[Arc<BlockHeader>]> = if job.recent_headers.is_empty() {
None
} else {
Some(job.recent_headers.as_slice())
};
let t_val = std::time::Instant::now();
let raw = parallel_ibd.validate_block_only(
&blockstore,
protocol.as_ref(),
&mut utxo_base,
Some(&mut job.bip30_index),
job.block_arc.as_ref(),
Some(Arc::clone(&job.block_arc)),
job.witnesses_storage.as_slice(),
Some(&job.witnesses_storage),
job.height,
recent_opt,
job.cached_network_time,
Some(&job.tx_ids),
);
let elapsed = t_val.elapsed();
let result = raw.map(|(_ids, delta)| delta);
if let Ok(Some(ref delta)) = &result {
store.worker_cache_put_protected(&delta.additions, height);
let cap = max_pending_ops.load(Ordering::Relaxed);
if cap > 0 {
let mut spins = 0u32;
while store.pending_len() > max_pending_ops.load(Ordering::Relaxed) {
if spins < 8 {
std::thread::yield_now();
} else {
std::thread::sleep(std::time::Duration::from_millis(1));
}
spins = spins.saturating_add(1);
}
}
store.apply_utxo_delta(delta, height, &mut del_scratch, &mut add_scratch, true);
}
let _ = tx.send(ValidateResult {
height,
result,
bip30_post: job.bip30_index,
elapsed,
view_build_ms,
});
}
}
fn adapt_max_pending_ops_tick(
cap: &AtomicUsize,
nominal: usize,
pressure: PressureLevel,
pending_len: usize,
last_adapt_ms: &AtomicU64,
) {
if nominal == 0 {
return;
}
const TICK_INTERVAL_MS: u64 = 500;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let last = last_adapt_ms.load(Ordering::Relaxed);
if now_ms.saturating_sub(last) < TICK_INTERVAL_MS {
return;
}
if last_adapt_ms
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return;
}
let current = cap.load(Ordering::Relaxed);
let new = match pressure {
PressureLevel::Emergency => {
(current / 2).max(nominal / 16).max(100_000)
}
PressureLevel::Critical => (current * 3 / 4).max(nominal / 8).max(500_000),
PressureLevel::Elevated => current,
PressureLevel::None => {
if pending_len < current / 4 {
let grown = (current as u128).saturating_mul(11) / 10;
let max = (nominal as u128).saturating_mul(11) / 10;
grown.min(max).max(nominal as u128 / 4) as usize
} else {
current
}
}
};
if new != current {
cap.store(new, Ordering::Relaxed);
if matches!(pressure, PressureLevel::Critical | PressureLevel::Emergency)
|| (pressure == PressureLevel::None && new > current)
{
tracing::debug!(
"[IBD_ADAPT] max_pending_ops {} → {} (pressure={:?}, pending={}, nominal={})",
current,
new,
pressure,
pending_len,
nominal
);
}
}
}
pub struct ValidationParams {
pub feeder_state: FeederState,
pub ibd_store: Arc<IbdUtxoStore>,
pub blockstore: Arc<BlockStore>,
pub storage: Arc<Storage>,
pub parallel_ibd: Arc<ParallelIBD>,
pub protocol: Arc<BitcoinProtocolEngine>,
pub utxo_mutex: Arc<std::sync::Mutex<UtxoSet>>,
pub effective_end_height: u64,
pub start_height: u64,
pub validation_height: Arc<std::sync::atomic::AtomicU64>,
pub mem_guard: MemoryGuard,
pub max_ahead_live: Arc<std::sync::atomic::AtomicU64>,
pub nominal_max_ahead: u64,
pub utxo_nominal_max_entries: usize,
pub utxo_prefetch_lookahead: usize,
pub stall_tx: tokio::sync::broadcast::Sender<u64>,
}
pub fn run_validation_loop(params: ValidationParams) -> Result<()> {
let feeder_state = params.feeder_state;
let ibd_store_v2_for_validation = params.ibd_store;
let blockstore = params.blockstore;
let storage_clone = params.storage;
let parallel_ibd = params.parallel_ibd;
let protocol = params.protocol;
let _utxo_mutex = params.utxo_mutex;
let effective_end_height = params.effective_end_height;
let start_height = params.start_height;
let validation_height = params.validation_height;
let mem_guard = params.mem_guard;
let system_total_ram_mb = mem_guard.system_total_ram_mb();
let spec_adds_bytes = Arc::clone(&mem_guard.spec_adds_bytes);
let mem_mtx = Arc::new(Mutex::new(mem_guard));
let max_ahead_live = params.max_ahead_live;
let nominal_max_ahead = params.nominal_max_ahead;
let utxo_nominal_max_entries = params.utxo_nominal_max_entries;
let stall_tx = params.stall_tx;
let nominal_prefetch_lookahead = params.utxo_prefetch_lookahead.clamp(1, 128);
let utxo_prefetch_lookahead_live = AtomicUsize::new(nominal_prefetch_lookahead);
let mut blocks_synced = 0;
let validation_start = std::time::Instant::now();
#[cfg(feature = "profile")]
let (ibd_profile_sample, ibd_profile_slow_ms, ibd_profile, ibd_disk_profile, ibd_blocked_log) = {
let mut sample: u64 = 0;
let mut slow: u64 = 0;
let mut disk = false;
let mut blocked_log = false;
if let Ok(val) = std::env::var("BLVM_IBD_DEBUG") {
let parts: Vec<&str> = val.split(',').map(|s| s.trim()).collect();
let full = parts.iter().any(|p| *p == "full");
for p in &parts {
let p = *p;
if p == "full" {
sample = sample.max(1);
disk = true;
blocked_log = true;
} else if p == "profile" {
sample = sample.max(1);
} else if let Some(rest_s) = p.strip_prefix("profile:") {
let rest: Vec<&str> = rest_s.split(':').collect();
if !rest.is_empty() && !rest[0].is_empty() {
if let Ok(n) = rest[0].parse::<u64>() {
if rest.len() >= 2 && !rest[1].is_empty() {
sample = sample.max(n.max(1));
if let Ok(s) = rest[1].parse::<u64>() {
slow = s;
}
} else if n < 100 {
sample = sample.max(1);
slow = n;
} else {
sample = sample.max(n);
}
}
}
} else if p == "blocked" {
blocked_log = true;
} else if p == "disk" {
disk = true;
}
}
if full && sample == 0 {
sample = 1;
disk = true;
blocked_log = true;
}
if sample > 0 && !blocked_log {
blocked_log = true; }
}
let on = sample > 0 || disk;
if on {
info!("IBD profiling ENABLED (BLVM_IBD_DEBUG): sample_interval={}, slow_threshold_ms={}, disk_io={}, blocked_log={}", sample, slow, disk, blocked_log);
}
if blocked_log {
info!("IBD_BLOCKED_LOG ENABLED: every validation-blocking stall will be logged");
}
(sample, slow, on, disk, blocked_log)
};
#[cfg(not(feature = "profile"))]
let (ibd_profile_sample, ibd_profile_slow_ms, ibd_profile, ibd_disk_profile, ibd_blocked_log) =
(0u64, 0u64, false, false, false);
let mut recent_headers_buf: VecDeque<Arc<BlockHeader>> = VecDeque::with_capacity(12);
let mut recent_snap_buf: Vec<Arc<BlockHeader>> = Vec::with_capacity(12);
let (storage_flush_interval, ibd_budget_mb) = {
let g = mem_mtx.lock();
(g.storage_flush_interval, g.budget_mb())
};
let mut pending_blocks: Vec<(Arc<Block>, Arc<Vec<Vec<Witness>>>, u64)> =
Vec::with_capacity(storage_flush_interval);
let mut pending_storage_bytes: u64 = 0;
let skip_storage = false;
let initial_buffer_limit = mem_mtx.lock().buffer_limit(start_height);
info!(
"Validation loop starting (deferred storage: flush every ~{} blocks [pressure-scaled], extra flush under Critical/Emergency when pending bytes exceed budget cap, initial buffer limit: {}, utxo_prefetch_lookahead_nominal: {})...",
storage_flush_interval,
initial_buffer_limit,
nominal_prefetch_lookahead,
);
let mut next_validation_height = start_height;
let mut flush_handles: VecDeque<std::thread::JoinHandle<Result<()>>> = VecDeque::new();
let utxo_flush_handles = Arc::new(Mutex::new(
VecDeque::<std::thread::JoinHandle<Result<()>>>::new(),
));
let retire_flush_counter: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let (max_block_flushes_in_flight, max_utxo_flushes_under_pressure) = {
let g = mem_mtx.lock();
(g.max_block_flushes, g.max_utxo_flushes)
};
let ibd_defer_flush = mem_mtx.lock().defer_flush;
let ibd_defer_checkpoint = mem_mtx.lock().defer_checkpoint_interval;
let mut blocks_buf: Vec<Arc<Block>> = Vec::with_capacity(nominal_prefetch_lookahead.max(8));
let mut keys_buf: Vec<OutPointKey> = Vec::new();
let mut keys_seen: rustc_hash::FxHashSet<OutPointKey> = rustc_hash::FxHashSet::default();
let mut keys_v2_buf: Vec<OutPointKey> = Vec::new();
let mut in_flight: VecDeque<InFlightEntry> = VecDeque::with_capacity(64);
let mut pending_results: BTreeMap<u64, ValidateResult> = BTreeMap::new();
let mut spec_adds: std::collections::BTreeMap<u64, Arc<UtxoSet>> =
std::collections::BTreeMap::new();
let snapshot_dir_base: Option<String> = std::env::var("BLVM_IBD_SNAPSHOT_DIR").ok();
let ibd_bps_csv_path: Option<String> = std::env::var("BLVM_IBD_BPS_CSV").ok();
let yield_interval: u64 = std::env::var("BLVM_IBD_YIELD_INTERVAL")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1000);
let mut bip30_index = Bip30Index::default();
let staged: Arc<Mutex<BTreeMap<u64, Arc<UtxoDelta>>>> = Arc::new(Mutex::new(BTreeMap::new()));
let retire_err: Arc<Mutex<Option<anyhow::Error>>> = Arc::new(Mutex::new(None));
let num_retire_shards = super::retire_dispatcher::configured_retire_shards();
if num_retire_shards > 1 {
info!(
"[IBD_RETIRE] sharded retire enabled: BLVM_IBD_RETIRE_SHARDS={} (workers contend on \
mem_mtx/mh_acc; sweet spot is 2..=4)",
num_retire_shards
);
}
let mut last_log_blocks: u64 = 0;
let mut last_log_instant = std::time::Instant::now();
let mut last_rss_mb: u64 = 0;
let mut last_collect_block: u64 = 0;
let mut prefetch_base_ema: Option<f64> = None;
let mut spec_adds_snapshot_buf: Vec<(u64, Arc<UtxoSet>)> = Vec::with_capacity(64);
#[cfg(all(feature = "utxo-commitments", feature = "production"))]
let (commitment_tree_shared, commitment_store_opt) = {
let pm = storage_clone.pruning();
let tree = pm
.as_ref()
.and_then(|p| p.commitment_store())
.and_then(|_| blvm_protocol::utxo_commitments::merkle_tree::UtxoMerkleTree::new().ok());
let store = pm.and_then(|p| p.commitment_store());
if tree.is_some() && store.is_some() {
info!("IBD: incremental UTXO commitment enabled (applying delta per block)");
}
(tree.map(|t| Arc::new(Mutex::new(t))), store)
};
#[cfg(not(all(feature = "utxo-commitments", feature = "production")))]
#[allow(unused_variables)]
let (commitment_tree_shared, commitment_store_opt) = (None::<()>, None::<()>);
let storage_for_retire = Arc::clone(&storage_clone);
let ibd_muhash_accumulator: Arc<Mutex<blvm_muhash::MuHash3072>> = Arc::new(Mutex::new(
crate::storage::ibd_utxo_muhash::load_ibd_muhash_from_chain(storage_clone.chain())?,
));
let max_pending_ops_nominal: usize = std::env::var("BLVM_IBD_MAX_PENDING_OPS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| {
let total_gb = (system_total_ram_mb + 512) / 1024;
if total_gb >= 32 {
0
} else if total_gb >= 24 {
16_000_000
} else if total_gb >= 16 {
8_000_000
} else if total_gb >= 12 {
6_000_000
} else {
4_000_000
}
});
let max_pending_ops: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(max_pending_ops_nominal));
let max_pending_ops_last_adapt_ms: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
if max_pending_ops_nominal > 0 {
info!(
"IBD: pending-ops backpressure active (max_pending_ops={}, adaptive: shrinks under \
RSS pressure, grows when drain keeps up; bounds [nominal/16, 2*nominal])",
max_pending_ops_nominal
);
} else {
info!("IBD: pending-ops backpressure disabled (32 GiB+ tier — full pipeline fits in RAM)");
}
#[cfg(all(feature = "utxo-commitments", feature = "production"))]
let mut _retire_dispatcher = {
let staged_outer = Arc::clone(&staged);
let store_outer = Arc::clone(&ibd_store_v2_for_validation);
let mem_mtx_outer = Arc::clone(&mem_mtx);
let utxo_flush_handles_outer = Arc::clone(&utxo_flush_handles);
let retire_flush_counter_outer = Arc::clone(&retire_flush_counter);
let max_ahead_live_outer = Arc::clone(&max_ahead_live);
let blockstore_outer = Arc::clone(&blockstore);
let ctree_outer = commitment_tree_shared.clone();
let cst_outer = commitment_store_opt.clone();
let storage_wm_outer = Arc::clone(&storage_for_retire);
let ibd_mh_outer = Arc::clone(&ibd_muhash_accumulator);
let retire_err_outer = Arc::clone(&retire_err);
let mpo_outer = Arc::clone(&max_pending_ops);
let mpo_last_outer = Arc::clone(&max_pending_ops_last_adapt_ms);
super::retire_dispatcher::RetireDispatcher::spawn(
num_retire_shards,
start_height.saturating_sub(1),
|i, work_rx, local_last_retired, publisher| {
let staged = Arc::clone(&staged_outer);
let store = Arc::clone(&store_outer);
let mem_mtx = Arc::clone(&mem_mtx_outer);
let utxo_flush_handles = Arc::clone(&utxo_flush_handles_outer);
let retire_flush_counter = Arc::clone(&retire_flush_counter_outer);
let max_ahead_live = Arc::clone(&max_ahead_live_outer);
let blockstore = Arc::clone(&blockstore_outer);
let ctree = ctree_outer.clone();
let cst = cst_outer.clone();
let storage_wm = Arc::clone(&storage_wm_outer);
let ibd_mh = Arc::clone(&ibd_mh_outer);
let retire_err = Arc::clone(&retire_err_outer);
let mpo = Arc::clone(&mpo_outer);
let mpo_last = Arc::clone(&mpo_last_outer);
std::thread::Builder::new()
.name(format!("ibd-retire-{i}"))
.spawn(move || {
run_ibd_retire_loop_with_commitment(
work_rx,
staged,
local_last_retired,
publisher,
store,
storage_wm,
mem_mtx,
max_ahead_live,
nominal_max_ahead,
ibd_defer_flush,
ibd_defer_checkpoint,
max_utxo_flushes_under_pressure,
utxo_flush_handles,
retire_flush_counter,
retire_err,
blockstore,
ctree,
cst,
ibd_mh,
mpo,
max_pending_ops_nominal,
mpo_last,
);
})
.expect("spawn IBD retire shard")
},
)
};
#[cfg(not(all(feature = "utxo-commitments", feature = "production")))]
let mut _retire_dispatcher = {
let staged_outer = Arc::clone(&staged);
let store_outer = Arc::clone(&ibd_store_v2_for_validation);
let mem_mtx_outer = Arc::clone(&mem_mtx);
let utxo_flush_handles_outer = Arc::clone(&utxo_flush_handles);
let retire_flush_counter_outer = Arc::clone(&retire_flush_counter);
let max_ahead_live_outer = Arc::clone(&max_ahead_live);
let storage_wm_outer = Arc::clone(&storage_for_retire);
let ibd_mh_outer = Arc::clone(&ibd_muhash_accumulator);
let retire_err_outer = Arc::clone(&retire_err);
let mpo_outer = Arc::clone(&max_pending_ops);
let mpo_last_outer = Arc::clone(&max_pending_ops_last_adapt_ms);
super::retire_dispatcher::RetireDispatcher::spawn(
num_retire_shards,
start_height.saturating_sub(1),
|i, work_rx, local_last_retired, publisher| {
let staged = Arc::clone(&staged_outer);
let store = Arc::clone(&store_outer);
let mem_mtx = Arc::clone(&mem_mtx_outer);
let utxo_flush_handles = Arc::clone(&utxo_flush_handles_outer);
let retire_flush_counter = Arc::clone(&retire_flush_counter_outer);
let max_ahead_live = Arc::clone(&max_ahead_live_outer);
let storage_wm = Arc::clone(&storage_wm_outer);
let ibd_mh = Arc::clone(&ibd_mh_outer);
let retire_err = Arc::clone(&retire_err_outer);
let mpo = Arc::clone(&mpo_outer);
let mpo_last = Arc::clone(&mpo_last_outer);
std::thread::Builder::new()
.name(format!("ibd-retire-{i}"))
.spawn(move || {
run_ibd_retire_loop_no_commitment(
work_rx,
staged,
local_last_retired,
publisher,
store,
storage_wm,
mem_mtx,
max_ahead_live,
nominal_max_ahead,
ibd_defer_flush,
ibd_defer_checkpoint,
max_utxo_flushes_under_pressure,
utxo_flush_handles,
retire_flush_counter,
retire_err,
ibd_mh,
mpo,
max_pending_ops_nominal,
mpo_last,
);
})
.expect("spawn IBD retire shard")
},
)
};
let last_retired: Arc<AtomicU64> = Arc::clone(_retire_dispatcher.global_last_retired());
let n_validate_workers: usize = std::env::var("BLVM_IBD_MAX_PARALLEL")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| {
let cpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let total_gb = (system_total_ram_mb + 512) / 1024;
if total_gb >= 32 {
cpus.saturating_sub(1).clamp(4, 24)
} else if total_gb >= 24 {
(cpus * 3 / 4).clamp(2, 16)
} else if total_gb >= 16 {
cpus.saturating_sub(2).clamp(4, 16)
} else {
(cpus / 2).clamp(1, 6)
}
});
let n_pipeline_depth: usize = std::env::var("BLVM_IBD_PIPELINE_DEPTH")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| 32_usize.clamp(n_validate_workers, 64))
.max(n_validate_workers);
info!(
"IBD: n_validate_workers={} pipeline_depth={}",
n_validate_workers, n_pipeline_depth
);
let (valjob_tx, valjob_rx) = crossbeam_channel::unbounded::<ValidateJob>();
let (valres_tx, valres_rx) = mpsc::channel::<ValidateResult>();
let mut _validate_workers: Vec<JoinHandle<()>> = Vec::with_capacity(n_validate_workers);
for i in 0..n_validate_workers {
let rx = valjob_rx.clone();
let tx = valres_tx.clone();
let pi = Arc::clone(¶llel_ibd);
let bs = Arc::clone(&blockstore);
let pr = Arc::clone(&protocol);
let st = Arc::clone(&ibd_store_v2_for_validation);
let lr = Arc::clone(&last_retired);
let mpo = Arc::clone(&max_pending_ops);
_validate_workers.push(
std::thread::Builder::new()
.name(format!("ibd-validate-{i}"))
.spawn(move || run_validation_worker_shared(rx, tx, pi, bs, pr, st, lr, mpo))
.expect("spawn IBD validate worker"),
);
}
drop(valjob_rx); drop(valres_tx);
loop {
let pipeline_depth_live: usize = if (91710..=91855).contains(&next_validation_height) {
1
} else {
n_pipeline_depth
};
while in_flight.len() < pipeline_depth_live {
let is_first = in_flight.is_empty();
let block_tuple_opt = if is_first {
const FEEDER_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
let next_block = loop {
let mut guard = feeder_state.0.lock();
if let Some((arc_b, w, input_keys, u, tx_ids, spec_adds, est_bytes)) =
guard.0.remove(next_validation_height)
{
guard.2 = guard.2.saturating_sub(est_bytes);
feeder_state.1.notify_one();
break Some((
next_validation_height,
arc_b,
w,
input_keys,
u,
tx_ids,
spec_adds,
est_bytes,
));
}
if guard.1 && guard.0.is_empty() {
break None;
}
#[cfg(feature = "profile")]
let wait_start = std::time::Instant::now();
let wait = feeder_state.1.wait_for(&mut guard, FEEDER_WAIT_TIMEOUT);
#[cfg(feature = "profile")]
if ibd_profile {
let wait_ms = wait_start.elapsed().as_millis() as u64;
if wait_ms >= 1 {
let buffer_len_after = guard.0.len();
let ts_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
blvm_protocol::profile_log!(
"[IBD_STALL_WAIT] next_height={} duration_ms={} buffer_after={} ts_ms={}",
next_validation_height, wait_ms, buffer_len_after, ts_ms
);
}
}
if wait.timed_out() {
let cur_min = guard.0.min_buffered_height();
warn!(
"[IBD_STALL] Validation waiting for block {} (buffer has {} blocks, min_height={:?}) — coordinator/feeder may be blocked",
next_validation_height, guard.0.len(), cur_min
);
let _ = stall_tx.send(next_validation_height);
}
};
next_block
} else {
let next_h = next_validation_height;
let mut guard = feeder_state.0.lock();
guard
.0
.remove(next_h)
.map(|(arc_b, w, ik, u, tx_ids, spec_adds, est_bytes)| {
guard.2 = guard.2.saturating_sub(est_bytes);
feeder_state.1.notify_one();
(next_h, arc_b, w, ik, u, tx_ids, spec_adds, est_bytes)
})
};
let (
h,
block_arc_d,
witnesses_d,
mut input_keys_from_feeder,
prefetched_utxos_d,
tx_ids_precomputed_d,
spec_adds_d,
feeder_est_bytes_d,
) = match block_tuple_opt {
None => break,
Some(t) => t,
};
if blocks_synced == 0 && in_flight.is_empty() {
info!("Validation: first block received, height {}", h);
}
let need_blocks_buf = ibd_store_v2_for_validation.is_dynamic_eviction();
if need_blocks_buf {
blocks_buf.clear();
let guard = feeder_state.0.lock();
let prefetch_look = utxo_prefetch_lookahead_live
.load(Ordering::Relaxed)
.clamp(1, 128);
for off in 1..=prefetch_look {
let bh = h + off as u64;
if let Some((b, _, _, _, _, _, _)) = guard.0.get(bh) {
blocks_buf.push(Arc::clone(b));
}
}
}
let witnesses_storage_d: Arc<Vec<Vec<Witness>>> = if witnesses_d.is_empty() {
shared_empty_witness_stacks(block_arc_d.transactions.len())
} else if witnesses_d.len() != block_arc_d.transactions.len() {
return match retire_thread_shutdown(&mut _retire_dispatcher, &retire_err) {
Ok(()) => Err(anyhow::anyhow!(
"Witness count mismatch at height {}: {} witnesses for {} transactions",
h,
witnesses_d.len(),
block_arc_d.transactions.len()
)),
Err(e) => Err(e),
};
} else {
Arc::new(witnesses_d)
};
if input_keys_from_feeder.is_empty() {
block_input_keys_into_filtered_with_tx_ids(
block_arc_d.as_ref(),
tx_ids_precomputed_d.as_slice(),
&mut keys_v2_buf,
);
} else {
std::mem::swap(&mut keys_v2_buf, &mut input_keys_from_feeder);
}
if h <= 200 {
debug!(
"[IBD_V2] height={} keys_needed={} store_len={}",
h,
keys_v2_buf.len(),
ibd_store_v2_for_validation.len()
);
}
spec_adds_snapshot_buf.clear();
spec_adds_snapshot_buf.extend(spec_adds.iter().map(|(sh, set)| (*sh, Arc::clone(set))));
let spec_adds_snapshot =
std::mem::replace(&mut spec_adds_snapshot_buf, Vec::with_capacity(64));
if let Some(ref base) = snapshot_dir_base {
const SNAPSHOT_HEIGHTS: &[u64] = &[
50_000, 90_000, 125_000, 133_000, 145_000, 175_000, 181_000, 190_000, 200_000,
];
if SNAPSHOT_HEIGHTS.contains(&h) {
let utxo_set = ibd_store_v2_for_validation.to_utxo_set_snapshot();
ParallelIBD::dump_ibd_snapshot(
h,
block_arc_d.as_ref(),
witnesses_storage_d.as_slice(),
&utxo_set,
base,
);
}
}
recent_snap_buf.clear();
recent_snap_buf.extend(recent_headers_buf.iter().cloned());
let recent_snap = std::mem::replace(&mut recent_snap_buf, Vec::with_capacity(12));
let cached_network_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let spec_arc: Arc<UtxoSet> = spec_adds_d;
let spec_entry_bytes = (spec_arc.len() as u64).saturating_mul(64);
spec_adds_bytes.fetch_add(spec_entry_bytes, Ordering::Relaxed);
spec_adds.insert(h, Arc::clone(&spec_arc));
let keys_for_job: Vec<OutPointKey> = std::mem::take(&mut keys_v2_buf);
let job_send = valjob_tx.send(ValidateJob {
height: h,
block_arc: Arc::clone(&block_arc_d),
witnesses_storage: Arc::clone(&witnesses_storage_d),
bip30_index: bip30_index.clone(),
recent_headers: recent_snap,
tx_ids: tx_ids_precomputed_d,
cached_network_time,
keys: keys_for_job,
spec_adds_snapshot,
prefetched: prefetched_utxos_d,
});
if job_send.is_err() {
return match retire_thread_shutdown(&mut _retire_dispatcher, &retire_err) {
Ok(()) => Err(anyhow::anyhow!(
"IBD validate workers stopped (failed to send job at height {})",
h
)),
Err(e) => Err(e),
};
}
in_flight.push_back(InFlightEntry {
height: h,
block_arc: block_arc_d,
witnesses_storage: witnesses_storage_d,
feeder_est_bytes: feeder_est_bytes_d,
utxo_base_ms: 0,
utxo_base_tune_ms: 0,
prefetch_ms: 0,
apply_pending_ms: 0,
input_keys: None,
});
next_validation_height = h + 1;
}
if in_flight.is_empty() {
break;
}
let next_process_h = in_flight.front().unwrap().height;
while let Ok(vres) = valres_rx.try_recv() {
if let Some(set) = spec_adds.remove(&vres.height) {
let freed = (set.len() as u64).saturating_mul(64);
spec_adds_bytes.fetch_sub(
freed.min(spec_adds_bytes.load(Ordering::Relaxed)),
Ordering::Relaxed,
);
}
pending_results.insert(vres.height, vres);
}
while !pending_results.contains_key(&next_process_h) {
match valres_rx.recv() {
Ok(vres) => {
if let Some(set) = spec_adds.remove(&vres.height) {
let freed = (set.len() as u64).saturating_mul(64);
spec_adds_bytes.fetch_sub(
freed.min(spec_adds_bytes.load(Ordering::Relaxed)),
Ordering::Relaxed,
);
}
pending_results.insert(vres.height, vres);
}
Err(_) => {
return match retire_thread_shutdown(&mut _retire_dispatcher, &retire_err) {
Ok(()) => Err(anyhow::anyhow!(
"IBD validate workers disconnected at height {}",
next_process_h
)),
Err(e) => Err(e),
};
}
}
}
let mut entry = in_flight.pop_front().unwrap();
let vres = pending_results.remove(&next_process_h).unwrap();
while spec_adds
.first_key_value()
.map(|(sh, _)| *sh <= next_process_h)
.unwrap_or(false)
{
if let Some((_, set)) = spec_adds.pop_first() {
let freed = (set.len() as u64).saturating_mul(64);
spec_adds_bytes.fetch_sub(
freed.min(spec_adds_bytes.load(Ordering::Relaxed)),
Ordering::Relaxed,
);
}
}
let next_height = entry.height;
let block_arc = entry.block_arc.clone();
let witnesses_storage = entry.witnesses_storage.clone();
let feeder_est_bytes = entry.feeder_est_bytes;
let utxo_base_ms = vres.view_build_ms;
let utxo_base_tune_ms_holder = vres.view_build_ms;
let prefetch_ms = entry.prefetch_ms;
let apply_pending_ms = entry.apply_pending_ms;
keys_v2_buf.clear();
let witnesses_to_use: &[Vec<Witness>] = witnesses_storage.as_slice();
bip30_index = vres.bip30_post;
let validation_time = vres.elapsed;
let validation_result = vres.result;
#[cfg(feature = "profile")]
let ibd_log_this_height =
ibd_blocked_log && ibd_profile_height_matches_sample(ibd_profile_sample, next_height);
#[cfg(feature = "profile")]
if ibd_log_this_height {
blvm_protocol::profile_log!(
"[IBD_VALIDATION] height={} phase=start (validate+suggested sync)",
next_height
);
}
let (sync_ms, evict_ms, utxo_flush_batch, rss_pressure, apply_utxo_ms, validation_result) =
match validation_result {
Ok(utxo_delta_opt) => {
let delta = Arc::new(utxo_delta_opt.unwrap_or_else(empty_utxo_delta));
{
let mut m = staged.lock();
m.insert(next_height, delta);
}
let retire_blocks_buf = if ibd_store_v2_for_validation.is_dynamic_eviction() {
blocks_buf.clone()
} else {
Vec::new()
};
if _retire_dispatcher
.send(IbdRetireWork {
height: next_height,
blocks_buf: retire_blocks_buf,
block: Arc::clone(&block_arc),
})
.is_err()
{
return match retire_thread_shutdown(&mut _retire_dispatcher, &retire_err) {
Ok(()) => Err(anyhow::anyhow!(
"IBD retire thread stopped (failed to send retire work at height {})",
next_height
)),
Err(e) => Err(e),
};
}
(
0u64,
0u64,
None::<PendingFlushPackage>,
false,
0u64,
Ok(None::<UtxoDelta>),
)
}
Err(e) => (0u64, 0u64, None, false, 0u64, Err(e)),
};
let utxo_base_tune_ms = utxo_base_tune_ms_holder;
#[allow(unused_variables)]
let gap_fill_ms = 0u64;
let ibd_pressure = memory::ibd_pressure_level_snapshot();
let ms = utxo_base_tune_ms as f64;
let ema = match prefetch_base_ema {
None => {
prefetch_base_ema = Some(ms);
ms
}
Some(prev) => {
let n = prev * (63.0 / 64.0) + ms * (1.0 / 64.0);
prefetch_base_ema = Some(n);
n
}
};
let mut target = nominal_prefetch_lookahead;
if ema > 12.0 {
target = (nominal_prefetch_lookahead + 32).min(128);
} else if ema > 8.0 {
target = (nominal_prefetch_lookahead * 4 / 3).min(128);
} else if ema > 5.0 {
target = (nominal_prefetch_lookahead + 16).min(128);
} else if ema < 0.75 && blocks_synced > 1_000 {
target = nominal_prefetch_lookahead.saturating_sub(8).max(48);
}
let with_pressure = dynamic_prefetch_lookahead(ibd_pressure, target);
utxo_prefetch_lookahead_live.store(with_pressure, Ordering::Relaxed);
#[cfg(feature = "profile")]
if ibd_log_this_height {
blvm_protocol::profile_log!(
"[IBD_VALIDATION] height={} phase=end utxo_base_ms={} validation_ms={} apply_utxo_ms={} apply_pending_ms={} sync_ms={} evict_ms={}",
next_height,
utxo_base_ms,
validation_time.as_millis(),
apply_utxo_ms,
apply_pending_ms,
sync_ms,
evict_ms
);
if apply_pending_ms > 2 {
blvm_protocol::profile_log!(
"[IBD_BLOCKED] phase=apply_pending height={} duration_ms={} (pending_writes/flushing scan for cache hits)",
next_height, apply_pending_ms
);
}
if sync_ms > 5 {
blvm_protocol::profile_log!(
"[IBD_BLOCKED] phase=sync_await height={} duration_ms={} (validation waited for previous block sync+evict)",
next_height, sync_ms
);
}
}
if let Err(ref e) = validation_result {
error!(
"Failed to prefetch/validate block at height {}: {}",
next_height, e
);
}
match validation_result {
Ok(_utxo_delta) => {
blocks_synced += 1;
let n_txs = block_arc.transactions.len();
let n_inputs: usize = block_arc
.transactions
.iter()
.map(|tx| tx.inputs.len())
.sum();
let header_rc = Arc::new(block_arc.header.clone());
if !skip_storage {
pending_storage_bytes =
pending_storage_bytes.saturating_add(feeder_est_bytes as u64);
pending_blocks.push((block_arc, Arc::clone(&witnesses_storage), next_height));
}
recent_headers_buf.push_back(header_rc);
if recent_headers_buf.len() > 11 {
recent_headers_buf.pop_front();
}
validation_height.store(next_height, Ordering::Relaxed);
let flush_interval_live = MemoryGuard::storage_flush_interval_live_for(
storage_flush_interval,
ibd_pressure,
);
let byte_cap = MemoryGuard::storage_flush_pending_bytes_pressure_cap_for(
ibd_budget_mb,
ibd_pressure,
);
let pressure_min_blocks =
MemoryGuard::storage_flush_pressure_min_blocks(flush_interval_live);
let flush_by_interval = pending_blocks.len() >= flush_interval_live;
let flush_by_pressure_bytes = byte_cap.is_some_and(|cap| {
pending_storage_bytes >= cap && pending_blocks.len() >= pressure_min_blocks
});
let (flush_ms, flushed_block_count) = if !skip_storage
&& (flush_by_interval || flush_by_pressure_bytes)
{
let flush_start = std::time::Instant::now();
while flush_handles.len() >= max_block_flushes_in_flight {
let in_flight = flush_handles.len();
let wait_start = std::time::Instant::now();
debug!(
"[IBD_DEBUG] Block {}: awaiting block storage flush slot (in_flight={}, pending_blocks={})",
next_height,
in_flight,
pending_blocks.len()
);
let Some(handle) = flush_handles.pop_front() else {
return match retire_thread_shutdown(
&mut _retire_dispatcher,
&retire_err,
) {
Ok(()) => Err(anyhow::anyhow!(
"IBD invariant violated: block storage flush wait queue empty under backpressure"
)),
Err(e) => Err(e),
};
};
match handle.join() {
Ok(Ok(())) => {
let waited_ms = wait_start.elapsed().as_millis() as u64;
debug!(
"[IBD_DEBUG] Block {}: block storage flush slot free (waited {}ms)",
next_height, waited_ms
);
#[cfg(feature = "profile")]
if ibd_blocked_log && waited_ms > 0 {
blvm_protocol::profile_log!(
"[IBD_BLOCKED] phase=block_flush_await height={} duration_ms={} in_flight={} utxo_flush={} (validation waited for block storage write)",
next_height,
waited_ms,
in_flight,
utxo_flush_handles.lock().len()
);
}
}
Ok(Err(e)) => {
return match retire_thread_shutdown(
&mut _retire_dispatcher,
&retire_err,
) {
Ok(()) => Err(e),
Err(e2) => Err(e2),
};
}
Err(e) => {
return match retire_thread_shutdown(
&mut _retire_dispatcher,
&retire_err,
) {
Ok(()) => Err(anyhow::anyhow!(
"Block storage flush thread panicked: {:?}",
e
)),
Err(e2) => Err(e2),
};
}
}
}
let to_flush = std::mem::take(&mut pending_blocks);
pending_storage_bytes = 0;
let blockstore_clone = Arc::clone(&blockstore);
let storage_for_flush = storage_clone.clone();
let to_flush_count = to_flush.len();
#[cfg(feature = "profile")]
if ibd_profile
&& ibd_profile_height_matches_sample(ibd_profile_sample, next_height)
{
blvm_protocol::profile_log!(
"[IBD_BLOCK_FLUSH_SPAWN] height={} blocks={} in_flight={}",
next_height,
to_flush_count,
flush_handles.len(),
);
}
flush_handles.push_back(std::thread::spawn(move || {
ParallelIBD::do_flush_to_storage(
blockstore_clone.as_ref(),
Some(&storage_for_flush),
to_flush,
)
}));
let flush_elapsed = flush_start.elapsed().as_millis() as u64;
debug!(
"[IBD_DEBUG] Block {}: spawned block storage flush (blocks={}, in_flight={}, await_took={}ms)",
next_height,
to_flush_count,
flush_handles.len(),
flush_elapsed
);
(flush_elapsed, to_flush_count)
} else {
(0, 0)
};
if !skip_storage && pending_blocks.is_empty() && flush_ms > 0 {
debug!(
"Started async flush ({} blocks, interval_live={}, pressure={:?}, by_bytes={}, {} in flight)",
flushed_block_count,
flush_interval_live,
ibd_pressure,
flush_by_pressure_bytes,
flush_handles.len()
);
}
#[cfg(feature = "profile")]
if ibd_profile {
let prefetch_await_ms = 0u64; let val_ms = validation_time.as_millis() as u64;
let total_ms = prefetch_await_ms
+ gap_fill_ms
+ prefetch_ms
+ utxo_base_ms
+ val_ms
+ apply_utxo_ms
+ sync_ms
+ evict_ms
+ flush_ms;
let disk_total = prefetch_await_ms
+ gap_fill_ms
+ prefetch_ms
+ sync_ms
+ evict_ms
+ flush_ms;
let should_log =
ibd_profile_height_matches_sample(ibd_profile_sample, next_height)
|| (ibd_disk_profile
&& (prefetch_await_ms > 0
|| gap_fill_ms > 0
|| prefetch_ms > 0
|| sync_ms > 0
|| evict_ms > 0))
|| (ibd_profile_slow_ms > 0
&& (prefetch_await_ms >= ibd_profile_slow_ms
|| gap_fill_ms >= ibd_profile_slow_ms
|| prefetch_ms >= ibd_profile_slow_ms
|| utxo_base_ms >= ibd_profile_slow_ms
|| val_ms >= ibd_profile_slow_ms
|| apply_utxo_ms >= ibd_profile_slow_ms
|| sync_ms >= ibd_profile_slow_ms
|| evict_ms >= ibd_profile_slow_ms
|| flush_ms >= ibd_profile_slow_ms));
if should_log && total_ms > 0 {
blvm_protocol::profile_log!(
"[IBD_PROFILE] height={} total_ms={} prefetch_await={} gap_fill={} prefetch={} utxo_base={} validation={} apply_utxo={} sync={} evict={} flush_coord={} disk_total={} txs={} inputs={}",
next_height,
total_ms,
prefetch_await_ms,
gap_fill_ms,
prefetch_ms,
utxo_base_ms,
val_ms,
apply_utxo_ms,
sync_ms,
evict_ms,
flush_ms,
disk_total,
n_txs,
n_inputs
);
let (dl, ch, ev, _ph) = ibd_store_v2_for_validation.stats();
let utxo_stats = (ibd_store_v2_for_validation.len(), dl, ch, ev);
blvm_protocol::profile_log!(
"[IBD_PIPELINE] height={} utxo_flush={} block_flush={} pending={} utxo_cache={} disk_loads={} cache_hits={} evictions={}",
next_height,
utxo_flush_handles.lock().len(),
flush_handles.len(),
pending_blocks.len(),
utxo_stats.0,
utxo_stats.1,
utxo_stats.2,
utxo_stats.3
);
}
}
}
Err(e) => {
for handle in utxo_flush_handles.lock().drain(..) {
let _ = handle.join();
}
for handle in flush_handles.drain(..) {
let _ = handle.join();
}
if !skip_storage && !pending_blocks.is_empty() {
let _ = parallel_ibd.flush_pending_blocks(
&blockstore,
Some(&storage_clone),
&mut pending_blocks,
);
}
error!("Failed to validate block at height {}: {}", next_height, e);
block_input_keys_into_filtered(block_arc.as_ref(), &mut keys_v2_buf);
{
let store = &ibd_store_v2_for_validation;
for k in keys_v2_buf.iter() {
let in_cache = store.cache_get(k).is_some();
if !in_cache {
error!(
"[IBD_MISSING_UTXO] height={} key={} in_cache=false (not in IbdUtxoStore cache at error time)",
next_height,
hex::encode(k),
);
}
}
}
let utxo_for_dump = ibd_store_v2_for_validation.build_utxo_map(&keys_v2_buf);
ParallelIBD::dump_failed_block(
next_height,
block_arc.as_ref(),
witnesses_to_use,
&utxo_for_dump,
&e,
);
return match retire_thread_shutdown(&mut _retire_dispatcher, &retire_err) {
Ok(()) => Err(e),
Err(e2) => Err(e2),
};
}
}
if yield_interval > 0 && blocks_synced % yield_interval == 0 {
#[cfg(feature = "profile")]
if ibd_profile && ibd_profile_height_matches_sample(ibd_profile_sample, next_height) {
blvm_protocol::profile_log!(
"[IBD_YIELD] blocks_synced={} utxo_flush={} block_flush={} (yielding to runtime)",
blocks_synced,
utxo_flush_handles.lock().len(),
flush_handles.len()
);
}
std::thread::yield_now();
}
if blocks_synced > 0 && blocks_synced % 500 == 0 {
#[cfg(all(not(target_os = "windows"), feature = "mimalloc"))]
{
let current_rss_mb = mem_mtx.lock().current_rss_mb();
let rss_growth_mb = current_rss_mb.saturating_sub(last_rss_mb);
let blocks_since_collect = blocks_synced.saturating_sub(last_collect_block);
if rss_growth_mb > 50 || blocks_since_collect >= 1000 {
ibd_maybe_heap_trim();
last_rss_mb = mem_mtx.lock().current_rss_mb();
last_collect_block = blocks_synced;
}
}
}
let should_log = blocks_synced == 1
|| blocks_synced == 10
|| blocks_synced == 100
|| (blocks_synced > 100
&& blocks_synced < 10_000
&& blocks_synced % 100 == 0
&& blocks_synced % 1000 != 0)
|| (blocks_synced > 0 && blocks_synced % 1000 == 0);
if should_log {
let total_elapsed = validation_start.elapsed().as_secs_f64();
let average_rate = if blocks_synced >= 100 && total_elapsed > 0.0 {
blocks_synced as f64 / total_elapsed
} else {
0.0
};
let blocks_since_last = blocks_synced.saturating_sub(last_log_blocks);
let recent_elapsed = last_log_instant.elapsed().as_secs_f64();
let recent_rate = if blocks_since_last > 0 && recent_elapsed > 0.01 {
blocks_since_last as f64 / recent_elapsed
} else {
0.0
};
last_log_blocks = blocks_synced;
last_log_instant = std::time::Instant::now();
let remaining = effective_end_height.saturating_sub(next_height);
let eta_rate = if blocks_synced >= 1000 && recent_rate > 0.0 {
recent_rate
} else if average_rate > 0.0 {
average_rate
} else {
f64::INFINITY
};
let eta = if eta_rate.is_finite() && eta_rate > 0.0 {
remaining as f64 / eta_rate
} else {
f64::INFINITY
};
let buffer_size = feeder_state.0.lock().0.len();
let rate_str = if blocks_synced < 100 {
"warming up (rate after block 100)".to_string()
} else if blocks_synced >= 1000 && blocks_since_last > 0 {
format!("{recent_rate:.1} blocks/s (avg since start: {average_rate:.1} blocks/s)")
} else {
format!("{average_rate:.1} blocks/s")
};
info!(
"IBD: {} / {} ({:.1}%) - {} - buffer: {} - ETA: {:.0}s",
next_height,
effective_end_height,
(next_height as f64 / effective_end_height as f64) * 100.0,
rate_str,
buffer_size,
eta
);
if blocks_synced % 5000 == 0 {
let (rss_kb, swap_kb) = {
#[cfg(target_os = "linux")]
{
let rss = std::fs::read_to_string("/proc/self/status")
.ok()
.and_then(|s| {
s.lines()
.find(|l| l.starts_with("VmRSS:"))
.and_then(|l| l.split_whitespace().nth(1))
.and_then(|v| v.parse::<u64>().ok())
})
.unwrap_or(0);
let swap = std::fs::read_to_string("/proc/self/status")
.ok()
.and_then(|s| {
s.lines()
.find(|l| l.starts_with("VmSwap:"))
.and_then(|l| l.split_whitespace().nth(1))
.and_then(|v| v.parse::<u64>().ok())
})
.unwrap_or(0);
(rss, swap)
}
#[cfg(not(target_os = "linux"))]
{
(0u64, 0u64)
}
};
let store_info = format!(
"utxo_cache={} pending={} inflight={} recent_prot={} spec_adds={}",
ibd_store_v2_for_validation.len(),
ibd_store_v2_for_validation.pending_len(),
ibd_store_v2_for_validation.in_flight_len(),
ibd_store_v2_for_validation.recently_accessed_len(),
spec_adds.len(),
);
info!(
"[MEM] h={} rss={}MB swap={}MB {} feeder={} threads={}",
next_height,
rss_kb / 1024,
swap_kb / 1024,
store_info,
buffer_size,
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(0)
);
}
if let Some(ref path) = ibd_bps_csv_path {
let elapsed_sec = validation_start.elapsed().as_secs();
let create_header = !std::path::Path::new(path).exists()
|| std::fs::metadata(path)
.map(|m| m.len() == 0)
.unwrap_or(true);
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
{
use std::io::Write;
if create_header {
let _ = writeln!(f, "height,elapsed_sec");
}
let _ = writeln!(f, "{next_height},{elapsed_sec}");
}
}
#[cfg(feature = "profile")]
if ibd_profile {
blvm_protocol::profile_log!(
"[IBD_PREFETCH_STATS] height={} utxo_flush={} block_flush={}",
next_height,
utxo_flush_handles.lock().len(),
flush_handles.len()
);
if blocks_synced > 0 && blocks_synced % 5000 == 0 {
let (dl, ch, ev, ph) = ibd_store_v2_for_validation.stats();
blvm_protocol::profile_log!(
"[IBD_UTXO_PATH] height={} disk_loads={} cache_hits={} evictions={} pending_hits={} cache_len={} (cumulative since start)",
next_height,
dl,
ch,
ev,
ph,
ibd_store_v2_for_validation.len()
);
}
if let Some((rss_mb, avail_mb)) = mem_mtx.lock().memory_diag() {
blvm_protocol::profile_log!(
"[IBD_DIAG] height={} rss_mb={} avail_mb={} utxo_flush={} block_flush={}",
next_height,
rss_mb,
avail_mb,
utxo_flush_handles.lock().len(),
flush_handles.len()
);
}
}
}
}
drop(valjob_tx);
for worker in _validate_workers {
if let Err(e) = worker.join() {
warn!("IBD validate worker join error: {:?}", e);
}
}
retire_thread_shutdown(&mut _retire_dispatcher, &retire_err)?;
if let Some(pkg) = ibd_store_v2_for_validation.take_remaining_flush_package() {
let store_clone = Arc::clone(&ibd_store_v2_for_validation);
let storage_shutdown = Arc::clone(&storage_clone);
let mh_shutdown = Arc::clone(&ibd_muhash_accumulator);
let heights = Arc::clone(&pkg.heights);
utxo_flush_handles
.lock()
.push_back(std::thread::spawn(move || {
let prepared = pkg.prepare_for_disk()?;
let muhash_running = {
let mut mh_guard = mh_shutdown.lock();
store_clone.flush_prepared_package(&prepared, Some(&mut *mh_guard))?;
mh_guard.serialize_running_state()
};
store_clone.flush_disk()?;
storage_shutdown.chain().persist_ibd_utxo_flush_checkpoint(
prepared.max_block_height,
&muhash_running,
)?;
store_clone.release_protected_heights(&heights);
store_clone.note_utxo_flush_completed(prepared.max_block_height);
Ok(())
}));
}
for handle in utxo_flush_handles.lock().drain(..) {
match handle.join() {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!("UTXO flush panicked at shutdown: {:?}", e));
}
}
}
let last_validated = next_validation_height.saturating_sub(1);
if let Err(e) = ibd_store_v2_for_validation.flush_disk() {
warn!(
"Failed to flush ibd_utxos memtable at final shutdown (height {}): {}",
last_validated, e
);
}
for handle in flush_handles.drain(..) {
match handle.join() {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(anyhow::anyhow!(
"Block storage flush thread panicked: {:?}",
e
));
}
}
}
if !skip_storage && !pending_blocks.is_empty() {
info!("Flushing final {} pending blocks", pending_blocks.len());
parallel_ibd.flush_pending_blocks(
&blockstore,
Some(&storage_clone),
&mut pending_blocks,
)?;
}
Ok(())
}
#[cfg(all(feature = "utxo-commitments", feature = "production"))]
#[allow(clippy::too_many_arguments)]
fn run_ibd_retire_loop_with_commitment(
work_rx: mpsc::Receiver<IbdRetireWork>,
staged: Arc<Mutex<BTreeMap<u64, Arc<UtxoDelta>>>>,
local_last_retired: Arc<AtomicU64>,
publisher: Arc<super::retire_dispatcher::GlobalProgressPublisher>,
store: Arc<IbdUtxoStore>,
storage_wm: Arc<Storage>,
mem_mtx: Arc<Mutex<MemoryGuard>>,
max_ahead_live: Arc<AtomicU64>,
nominal_max_ahead: u64,
ibd_defer_flush: bool,
ibd_defer_checkpoint: u64,
max_utxo_flushes_under_pressure: usize,
utxo_flush_handles: Arc<Mutex<VecDeque<JoinHandle<Result<()>>>>>,
retire_flush_counter: Arc<AtomicUsize>,
retire_err: Arc<Mutex<Option<anyhow::Error>>>,
blockstore: Arc<BlockStore>,
commitment_tree: Option<
Arc<Mutex<blvm_protocol::utxo_commitments::merkle_tree::UtxoMerkleTree>>,
>,
commitment_cstore: Option<Arc<crate::storage::commitment_store::CommitmentStore>>,
ibd_muhash: Arc<Mutex<blvm_muhash::MuHash3072>>,
max_pending_ops: Arc<AtomicUsize>,
max_pending_ops_nominal: usize,
max_pending_ops_last_adapt_ms: Arc<AtomicU64>,
) {
let mut keys_buf: Vec<OutPointKey> = Vec::new();
let mut keys_seen = rustc_hash::FxHashSet::default();
let mut evict_scratch: Vec<(OutPointKey, u64)> = Vec::new();
loop {
let work = match work_rx.recv_timeout(Duration::from_millis(100)) {
Ok(w) => w,
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
if memory::ibd_pressure_is_emergency() {
let level = {
let mut mem = mem_mtx.lock();
let level = mem.should_flush(Some((&max_ahead_live, nominal_max_ahead)));
memory::publish_ibd_pressure(level);
level
};
if level >= PressureLevel::Critical {
let evictable = store.len().saturating_sub(store.protected_len());
if evictable >= IBD_EMERGENCY_EVICT_MIN_UNPROTECTED {
store.evict_aggressive_for_rss();
}
let _ = local_last_retired;
if let Some(pkg) = store.take_flush_batch_force() {
if let Err(e) = push_utxo_flush_from_retire(
&store,
&storage_wm,
&utxo_flush_handles,
&retire_flush_counter,
0,
max_utxo_flushes_under_pressure,
pkg,
&ibd_muhash,
) {
*retire_err.lock() = Some(e);
return;
}
}
ibd_maybe_heap_trim();
}
}
continue;
}
};
let h = work.height;
let delta_arc = {
let g = staged.lock();
g.get(&h).cloned()
};
if let (Some(cref), Some(_), Some(delta_arc)) = (
commitment_tree.as_ref(),
commitment_cstore.as_ref(),
delta_arc.as_ref(),
) {
let mut t = cref.lock();
let store_r = store.as_ref();
for dk in &delta_arc.deletions {
let op = blvm_protocol::utxo_overlay::utxo_deletion_key_to_outpoint(dk);
let key = outpoint_to_key(&op);
if let Some(utxo) = store_r.get(&key) {
if let Err(e) = t.remove(&op, &utxo) {
warn!("IBD commitment: remove failed at height {}: {}", h, e);
}
}
}
for (op, arc) in &delta_arc.additions {
if let Err(e) = t.insert(*op, arc.as_ref().clone()) {
warn!("IBD commitment: insert failed at height {}: {}", h, e);
}
}
}
let mut mem = mem_mtx.lock();
let (opt_pkg, _) = {
let (_s, _e, p, r) = ibd_v2_retire_apply_utxo_delta(
h,
store.as_ref(),
&work.blocks_buf,
&mut keys_buf,
&mut keys_seen,
&mut evict_scratch,
&mut mem,
&max_ahead_live,
nominal_max_ahead,
ibd_defer_flush,
ibd_defer_checkpoint,
);
(p, r)
};
drop(mem);
if let (Some(cref), Some(cstore)) = (commitment_tree.as_ref(), commitment_cstore.as_ref()) {
let block_hash = blockstore.get_block_hash(work.block.as_ref());
let commitment = {
let t = cref.lock();
t.generate_commitment(block_hash, h)
};
if let Err(e) = cstore.store_commitment(&block_hash, h, &commitment) {
warn!("IBD commitment: store failed at height {}: {}", h, e);
*retire_err.lock() = Some(e);
return;
}
}
publisher.publish(&local_last_retired, h);
adapt_max_pending_ops_tick(
&max_pending_ops,
max_pending_ops_nominal,
memory::ibd_pressure_level_snapshot(),
store.pending_len(),
&max_pending_ops_last_adapt_ms,
);
staged.lock().remove(&h);
if let Some(pkg) = opt_pkg {
if let Err(e) = push_utxo_flush_from_retire(
&store,
&storage_wm,
&utxo_flush_handles,
&retire_flush_counter,
h,
max_utxo_flushes_under_pressure,
pkg,
&ibd_muhash,
) {
*retire_err.lock() = Some(e);
return;
}
}
}
}
#[cfg(not(all(feature = "utxo-commitments", feature = "production")))]
#[allow(clippy::too_many_arguments)]
fn run_ibd_retire_loop_no_commitment(
work_rx: mpsc::Receiver<IbdRetireWork>,
staged: Arc<Mutex<BTreeMap<u64, Arc<UtxoDelta>>>>,
local_last_retired: Arc<AtomicU64>,
publisher: Arc<super::retire_dispatcher::GlobalProgressPublisher>,
store: Arc<IbdUtxoStore>,
storage_wm: Arc<Storage>,
mem_mtx: Arc<Mutex<MemoryGuard>>,
max_ahead_live: Arc<AtomicU64>,
nominal_max_ahead: u64,
ibd_defer_flush: bool,
ibd_defer_checkpoint: u64,
max_utxo_flushes_under_pressure: usize,
utxo_flush_handles: Arc<Mutex<VecDeque<JoinHandle<Result<()>>>>>,
retire_flush_counter: Arc<AtomicUsize>,
retire_err: Arc<Mutex<Option<anyhow::Error>>>,
ibd_muhash: Arc<Mutex<blvm_muhash::MuHash3072>>,
max_pending_ops: Arc<AtomicUsize>,
max_pending_ops_nominal: usize,
max_pending_ops_last_adapt_ms: Arc<AtomicU64>,
) {
let mut keys_buf: Vec<OutPointKey> = Vec::new();
let mut keys_seen = rustc_hash::FxHashSet::default();
let mut evict_scratch: Vec<(OutPointKey, u64)> = Vec::new();
loop {
let work = match work_rx.recv_timeout(Duration::from_millis(100)) {
Ok(w) => w,
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
if memory::ibd_pressure_is_emergency() {
let level = {
let mut mem = mem_mtx.lock();
let level = mem.should_flush(Some((&max_ahead_live, nominal_max_ahead)));
memory::publish_ibd_pressure(level);
level
};
if level >= PressureLevel::Critical {
let evictable = store.len().saturating_sub(store.protected_len());
if evictable >= IBD_EMERGENCY_EVICT_MIN_UNPROTECTED {
store.evict_aggressive_for_rss();
}
let _ = local_last_retired;
if let Some(pkg) = store.take_flush_batch_force() {
if let Err(e) = push_utxo_flush_from_retire(
&store,
&storage_wm,
&utxo_flush_handles,
&retire_flush_counter,
0,
max_utxo_flushes_under_pressure,
pkg,
&ibd_muhash,
) {
*retire_err.lock() = Some(e);
return;
}
}
ibd_maybe_heap_trim();
}
}
continue;
}
};
let h = work.height;
let mut mem = mem_mtx.lock();
let (opt_pkg, _) = {
let (_s, _e, p, r) = ibd_v2_retire_apply_utxo_delta(
h,
store.as_ref(),
&work.blocks_buf,
&mut keys_buf,
&mut keys_seen,
&mut evict_scratch,
&mut mem,
&max_ahead_live,
nominal_max_ahead,
ibd_defer_flush,
ibd_defer_checkpoint,
);
(p, r)
};
drop(mem);
publisher.publish(&local_last_retired, h);
adapt_max_pending_ops_tick(
&max_pending_ops,
max_pending_ops_nominal,
memory::ibd_pressure_level_snapshot(),
store.pending_len(),
&max_pending_ops_last_adapt_ms,
);
staged.lock().remove(&h);
if let Some(pkg) = opt_pkg {
if let Err(e) = push_utxo_flush_from_retire(
&store,
&storage_wm,
&utxo_flush_handles,
&retire_flush_counter,
h,
max_utxo_flushes_under_pressure,
pkg,
&ibd_muhash,
) {
*retire_err.lock() = Some(e);
return;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fresh_cap(initial: usize) -> Arc<AtomicUsize> {
Arc::new(AtomicUsize::new(initial))
}
fn fresh_last_adapt_zero() -> Arc<AtomicU64> {
Arc::new(AtomicU64::new(0))
}
#[test]
fn adapt_nominal_zero_is_inert() {
let cap = fresh_cap(0);
let last = fresh_last_adapt_zero();
for level in [
PressureLevel::None,
PressureLevel::Elevated,
PressureLevel::Critical,
PressureLevel::Emergency,
] {
adapt_max_pending_ops_tick(&cap, 0, level, 1_000_000, &last);
assert_eq!(cap.load(Ordering::Relaxed), 0);
}
}
#[test]
fn adapt_emergency_halves_cap() {
let cap = fresh_cap(8_000_000);
let last = fresh_last_adapt_zero();
adapt_max_pending_ops_tick(&cap, 8_000_000, PressureLevel::Emergency, 8_000_000, &last);
let new = cap.load(Ordering::Relaxed);
assert!(new < 8_000_000, "Emergency must shrink");
assert!(new >= 100_000, "Emergency must respect 100k floor");
}
#[test]
fn adapt_critical_multiplies_by_three_quarters() {
let cap = fresh_cap(8_000_000);
let last = fresh_last_adapt_zero();
adapt_max_pending_ops_tick(&cap, 8_000_000, PressureLevel::Critical, 8_000_000, &last);
let new = cap.load(Ordering::Relaxed);
assert!(new < 8_000_000);
assert!(
new >= 8_000_000 / 8,
"Critical must respect nominal/8 floor"
);
}
#[test]
fn adapt_elevated_is_hold() {
let cap = fresh_cap(8_000_000);
let last = fresh_last_adapt_zero();
adapt_max_pending_ops_tick(&cap, 8_000_000, PressureLevel::Elevated, 8_000_000, &last);
assert_eq!(cap.load(Ordering::Relaxed), 8_000_000);
}
#[test]
fn adapt_none_grows_when_drain_keeps_up() {
let cap = fresh_cap(8_000_000);
let last = fresh_last_adapt_zero();
adapt_max_pending_ops_tick(&cap, 8_000_000, PressureLevel::None, 100_000, &last);
let new = cap.load(Ordering::Relaxed);
assert!(new > 8_000_000, "None + drain-ahead must grow cap");
assert!(new <= 16_000_000, "Must respect 2*nominal cap");
}
#[test]
fn adapt_none_holds_when_pending_full() {
let cap = fresh_cap(8_000_000);
let last = fresh_last_adapt_zero();
adapt_max_pending_ops_tick(&cap, 8_000_000, PressureLevel::None, 7_000_000, &last);
assert_eq!(cap.load(Ordering::Relaxed), 8_000_000);
}
#[test]
fn adapt_throttle_skips_recent_calls() {
let cap = fresh_cap(8_000_000);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let last = Arc::new(AtomicU64::new(now_ms));
adapt_max_pending_ops_tick(&cap, 8_000_000, PressureLevel::Emergency, 8_000_000, &last);
assert_eq!(cap.load(Ordering::Relaxed), 8_000_000);
}
#[test]
fn adapt_emergency_respects_floor_under_repeat() {
let nominal = 8_000_000;
let cap = fresh_cap(nominal);
for _ in 0..50 {
let last = fresh_last_adapt_zero();
adapt_max_pending_ops_tick(&cap, nominal, PressureLevel::Emergency, nominal, &last);
}
let final_cap = cap.load(Ordering::Relaxed);
assert!(
final_cap >= 100_000,
"must respect hard floor (got {})",
final_cap
);
assert!(
final_cap <= nominal / 8,
"must shrink well below nominal (got {} for nominal {})",
final_cap,
nominal
);
}
}