use crate::storage::database::Tree;
use crate::storage::disk_utxo::{
key_to_outpoint, load_keys_from_disk, outpoint_to_key, SyncBatch, MAX_BATCH_OPS,
};
use anyhow::Result;
use blvm_muhash::{serialize_coin_for_muhash, MuHash3072};
use blvm_protocol::block::compute_block_tx_ids;
use blvm_protocol::transaction::is_coinbase;
use blvm_protocol::types::{OutPoint, UtxoSet, UTXO};
use dashmap::{DashMap, DashSet};
use hex;
use rustc_hash::{FxHashMap, FxHashSet};
#[cfg(feature = "production")]
use std::str::FromStr;
use std::sync::atomic::{AtomicIsize, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, OnceLock};
use tracing::debug;
fn ibd_per_op_muhash_enabled() -> bool {
static SKIP: OnceLock<bool> = OnceLock::new();
!*SKIP.get_or_init(|| {
std::env::var("BLVM_IBD_SKIP_PER_OP_MUHASH")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
})
}
type OutPointKey = [u8; 40];
#[inline]
fn utxo_muhash_preimage_ibd(op: &OutPoint, utxo: &UTXO) -> Vec<u8> {
serialize_coin_for_muhash(
&op.hash,
op.index,
utxo.height as u32,
utxo.is_coinbase,
utxo.value,
utxo.script_pubkey.as_ref(),
)
}
#[inline]
fn consensus_deletion_key_to_store_key(
k: &blvm_protocol::utxo_overlay::UtxoDeletionKey,
) -> OutPointKey {
let mut key = [0u8; 40];
key[..32].copy_from_slice(&k[..32]);
let idx = u32::from_be_bytes(k[32..36].try_into().unwrap());
key[32..40].copy_from_slice(&(idx as u64).to_be_bytes());
key
}
pub(crate) type PendingValue = Option<Arc<UTXO>>;
pub type PendingFlushBatch = Vec<(OutPointKey, PendingValue)>;
#[derive(Clone)]
pub struct PendingFlushPackage {
pub ops: Arc<PendingFlushBatch>,
pub max_block_height: u64,
pub heights: Arc<FxHashSet<u32>>,
}
pub struct PreparedFlushPackage {
pub rows: Arc<Vec<(OutPointKey, Option<(u32, u32)>)>>,
pub slab: Arc<Vec<u8>>,
pub max_block_height: u64,
}
pub(crate) const UNPROTECTED_HEIGHT: u32 = u32::MAX;
#[derive(Clone)]
pub struct UtxoCacheSlot {
pub generation: u64,
pub utxo: Arc<UTXO>,
pub block_height: u32,
}
type PendingLogEntry = (OutPointKey, PendingValue, u64);
pub(crate) const PENDING_SHARDS: usize = 16;
const PENDING_SHARD_MASK: usize = PENDING_SHARDS - 1;
#[inline]
pub(crate) fn pending_shard_idx(key: &OutPointKey) -> usize {
(key[0] as usize) & PENDING_SHARD_MASK
}
fn dedupe_pending_triples_in_place(v: &mut Vec<PendingLogEntry>) {
if v.len() <= 1 {
return;
}
v.sort_unstable_by_key(|(k, _, h)| (*k, *h));
let mut write = 0usize;
let mut i = 0usize;
while i < v.len() {
let key = v[i].0;
let mut j = i + 1;
while j < v.len() && v[j].0 == key {
j += 1;
}
let win = j - 1;
if write != win {
v.swap(write, win);
}
write += 1;
i = j;
}
v.truncate(write);
}
fn pack_flush_package(raw: Vec<PendingLogEntry>) -> Option<PendingFlushPackage> {
if raw.is_empty() {
return None;
}
let (batch, max_h, heights) = dedupe_to_batch_and_max(raw);
Some(PendingFlushPackage {
ops: Arc::new(batch),
max_block_height: max_h,
heights: Arc::new(heights),
})
}
fn dedupe_to_batch_and_max(
mut v: Vec<PendingLogEntry>,
) -> (PendingFlushBatch, u64, FxHashSet<u32>) {
if v.is_empty() {
return (Vec::new(), 0, FxHashSet::default());
}
let mut all_heights: FxHashSet<u32> = FxHashSet::default();
for (_, _, h) in v.iter() {
if *h != 0 {
all_heights.insert(*h as u32);
}
}
dedupe_pending_triples_in_place(&mut v);
let mut max_h = 0u64;
let mut batch = Vec::with_capacity(v.len());
for (k, val, h) in v {
max_h = max_h.max(h);
batch.push((k, val));
}
batch.sort_unstable_by_key(|(k, _)| *k);
(batch, max_h, all_heights)
}
#[cfg(feature = "production")]
impl PendingFlushPackage {
pub fn prepare_for_disk(&self) -> Result<PreparedFlushPackage> {
let n_adds = self.ops.iter().filter(|(_, v)| v.is_some()).count();
let mut slab: Vec<u8> = Vec::with_capacity(n_adds * 100);
let mut rows: Vec<(OutPointKey, Option<(u32, u32)>)> = Vec::with_capacity(self.ops.len());
for (key, value_opt) in self.ops.iter() {
let encoded = match value_opt {
Some(arc) => {
let start = slab.len() as u32;
bincode::serialize_into(&mut slab, arc.as_ref())
.map_err(|e| anyhow::anyhow!("UTXO serialize: {}", e))?;
let end = slab.len() as u32;
Some((start, end - start))
}
None => None,
};
rows.push((*key, encoded));
}
Ok(PreparedFlushPackage {
rows: Arc::new(rows),
slab: Arc::new(slab),
max_block_height: self.max_block_height,
})
}
}
#[derive(Clone, Copy, PartialEq, Eq)]
#[cfg(feature = "production")]
pub enum EvictionStrategy {
Dynamic,
Fifo,
Lifo,
}
#[cfg(feature = "production")]
impl FromStr for EvictionStrategy {
type Err = std::convert::Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s.trim().to_lowercase().as_str() {
"dynamic" => Self::Dynamic,
"lifo" => Self::Lifo,
_ => Self::Fifo,
})
}
}
#[cfg(feature = "production")]
impl EvictionStrategy {
fn from_env() -> Self {
let s = std::env::var("BLVM_IBD_EVICTION").unwrap_or_default();
s.parse().unwrap_or(Self::Fifo)
}
}
const EVICT_MIN_AGE_BLOCKS: u64 = 100;
const EVICT_VERY_OLD_BLOCKS: u64 = 10_000;
const EVICT_SCAN_CAP: usize = 16_384;
#[cfg(feature = "production")]
pub struct IbdUtxoStore {
cache: DashMap<OutPointKey, UtxoCacheSlot>,
disk: Arc<dyn Tree>,
total_utxo_count: AtomicIsize,
flush_threshold: usize,
pending_shards: Vec<Mutex<Vec<PendingLogEntry>>>,
pending_log_size: AtomicUsize,
memory_only: bool,
max_entries_cap: AtomicUsize,
eviction_strategy: EvictionStrategy,
recently_accessed: Mutex<FxHashSet<OutPointKey>>,
cache_generation: AtomicU64,
utxo_disk_commit_height: AtomicU64,
utxo_barrier_mu: Mutex<()>,
utxo_barrier_cv: Condvar,
in_flight_insertions: DashMap<OutPointKey, Arc<UTXO>>,
protected_heights: DashSet<u32>,
stats_disk_loads: AtomicU64,
stats_cache_hits: AtomicU64,
stats_evictions: AtomicU64,
stats_pending_hits: AtomicU64,
}
#[cfg(feature = "production")]
impl IbdUtxoStore {
pub fn new(disk: Arc<dyn Tree>, flush_threshold: usize) -> Self {
Self::new_with_options(
disk,
flush_threshold,
false,
usize::MAX,
EvictionStrategy::from_env(),
0,
)
}
pub fn new_memory_only() -> Self {
struct NullTree;
impl Tree for NullTree {
fn insert(&self, _: &[u8], _: &[u8]) -> Result<()> {
Ok(())
}
fn get(&self, _: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(None)
}
fn remove(&self, _: &[u8]) -> Result<()> {
Ok(())
}
fn contains_key(&self, _: &[u8]) -> Result<bool> {
Ok(false)
}
fn clear(&self) -> Result<()> {
Ok(())
}
fn len(&self) -> Result<usize> {
Ok(0)
}
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + '_> {
Box::new(std::iter::empty())
}
fn batch(&self) -> Result<Box<dyn crate::storage::database::BatchWriter + '_>> {
struct NullBatch;
impl crate::storage::database::BatchWriter for NullBatch {
fn put(&mut self, _: &[u8], _: &[u8]) {}
fn delete(&mut self, _: &[u8]) {}
fn commit(self: Box<Self>) -> Result<()> {
Ok(())
}
fn len(&self) -> usize {
0
}
}
Ok(Box::new(NullBatch))
}
}
Self::new_with_options(
Arc::new(NullTree),
usize::MAX,
true,
usize::MAX,
EvictionStrategy::from_env(),
0,
)
}
#[inline]
pub fn memory_only(&self) -> bool {
self.memory_only
}
pub fn new_with_options(
disk: Arc<dyn Tree>,
flush_threshold: usize,
memory_only: bool,
max_entries: usize,
eviction_strategy: EvictionStrategy,
utxo_disk_commit_through: u64,
) -> Self {
Self {
cache: DashMap::with_shard_amount(128),
disk,
total_utxo_count: AtomicIsize::new(0),
flush_threshold,
pending_shards: (0..PENDING_SHARDS)
.map(|_| Mutex::new(Vec::new()))
.collect(),
pending_log_size: AtomicUsize::new(0),
memory_only,
max_entries_cap: AtomicUsize::new(max_entries),
eviction_strategy,
recently_accessed: Mutex::new(FxHashSet::default()),
cache_generation: AtomicU64::new(1),
utxo_disk_commit_height: AtomicU64::new(utxo_disk_commit_through),
utxo_barrier_mu: Mutex::new(()),
utxo_barrier_cv: Condvar::new(),
in_flight_insertions: DashMap::default(),
protected_heights: DashSet::new(),
stats_disk_loads: AtomicU64::new(0),
stats_cache_hits: AtomicU64::new(0),
stats_evictions: AtomicU64::new(0),
stats_pending_hits: AtomicU64::new(0),
}
}
#[inline]
fn max_entries_effective(&self) -> usize {
self.max_entries_cap.load(Ordering::Relaxed)
}
#[inline]
pub fn cache_cap(&self) -> usize {
self.max_entries_cap.load(Ordering::Relaxed)
}
#[inline]
pub fn protected_len(&self) -> usize {
self.protected_heights.len()
}
pub fn release_protected_heights(&self, heights: &FxHashSet<u32>) {
for &h in heights {
self.protected_heights.remove(&h);
}
}
pub fn tune_max_entries_for_pressure(&self, new_cap: usize, current_height: u64) {
if self.memory_only {
return;
}
let old = self.max_entries_cap.load(Ordering::Relaxed);
if old == usize::MAX {
return;
}
let new_cap = new_cap.max(4_096);
if new_cap == old {
return;
}
self.max_entries_cap.store(new_cap, Ordering::Relaxed);
if new_cap < old {
if self.eviction_strategy == EvictionStrategy::Dynamic {
self.evict_if_needed(current_height);
}
self.maybe_evict_tl();
if new_cap < old * 8 / 10 {
self.cache.shrink_to_fit();
}
}
}
#[inline]
fn next_cache_generation(&self) -> u64 {
self.cache_generation.fetch_add(1, Ordering::Relaxed)
}
#[inline]
fn cache_put(&self, key: OutPointKey, utxo: Arc<UTXO>, block_height: u32) {
let gen = self.next_cache_generation();
self.cache.insert(
key,
UtxoCacheSlot {
generation: gen,
utxo,
block_height,
},
);
}
pub fn note_utxo_flush_completed(&self, max_block_height: u64) {
self.utxo_disk_commit_height
.fetch_max(max_block_height, Ordering::Release);
let _held = self.utxo_barrier_mu.lock().expect("utxo barrier mu");
self.utxo_barrier_cv.notify_all();
}
#[inline]
pub fn utxo_disk_commit_height_snapshot(&self) -> u64 {
self.utxo_disk_commit_height.load(Ordering::Acquire)
}
pub fn wait_utxo_disk_through(&self, min_height: u64) {
let mut guard = self.utxo_barrier_mu.lock().expect("utxo barrier mu");
while self.utxo_disk_commit_height.load(Ordering::Acquire) < min_height {
guard = self.utxo_barrier_cv.wait(guard).expect("utxo barrier cv");
}
}
#[inline]
pub fn is_dynamic_eviction(&self) -> bool {
self.eviction_strategy == EvictionStrategy::Dynamic
}
#[inline]
pub(crate) fn skip_recache_disk_hits(&self) -> bool {
self.max_entries_effective() != usize::MAX
&& self.cache.len().saturating_mul(100)
>= self.max_entries_effective().saturating_mul(98)
}
#[inline]
pub(crate) fn max_entries_is_bounded(&self) -> bool {
self.max_entries_effective() != usize::MAX
}
pub(crate) fn supplement_in_flight_for_keys(
&self,
keys: &[OutPointKey],
map: &mut rustc_hash::FxHashMap<OutPointKey, Arc<UTXO>>,
) {
if self.in_flight_insertions.is_empty() {
return;
}
for key in keys {
if map.contains_key(key) {
continue;
}
if let Some(arc) = self.in_flight_insertions.get(key) {
map.insert(*key, Arc::clone(arc.value()));
self.stats_pending_hits.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn pending_len(&self) -> usize {
self.pending_log_size.load(Ordering::Relaxed)
}
pub fn recently_accessed_len(&self) -> usize {
self.recently_accessed.lock().map(|g| g.len()).unwrap_or(0)
}
pub fn in_flight_len(&self) -> usize {
self.in_flight_insertions.len()
}
fn eviction_scan_cap(&self, to_evict: usize) -> usize {
let hint = to_evict.saturating_mul(2).max(512);
hint.min(EVICT_SCAN_CAP)
.min(self.cache.len().saturating_add(1))
}
#[doc(hidden)]
pub fn maybe_evict(&self, evict_scratch: &mut Vec<(OutPointKey, u64)>) {
if self.max_entries_effective() == usize::MAX {
return;
}
if self.eviction_strategy == EvictionStrategy::Dynamic {
return;
}
let len = self.cache.len();
if len <= self.max_entries_effective() {
return;
}
let to_evict = len - self.max_entries_effective();
let scan_cap = self.eviction_scan_cap(to_evict);
evict_scratch.clear();
for r in self.cache.iter() {
if evict_scratch.len() >= scan_cap {
break;
}
let v = r.value();
if v.block_height != UNPROTECTED_HEIGHT
&& self.protected_heights.contains(&v.block_height)
{
continue;
}
evict_scratch.push((*r.key(), v.generation));
}
if self.eviction_strategy == EvictionStrategy::Lifo {
evict_scratch.sort_by_key(|(_, g)| std::cmp::Reverse(*g));
} else {
evict_scratch.sort_by_key(|(_, g)| *g);
}
let pending_now = self.pending_log_size.load(Ordering::Relaxed);
let mut evicted = 0;
for (key, _) in evict_scratch.iter() {
if evicted >= to_evict {
break;
}
if self.cache.remove(key).is_some() {
evicted += 1;
self.stats_evictions.fetch_add(1, Ordering::Relaxed);
}
}
if evicted > 0 {
debug!(
"IbdUtxoStore: evicted {} entries (cache over limit, pending={})",
evicted, pending_now
);
}
}
pub fn protect_keys_for_next_blocks(&self, keys: &[OutPointKey]) {
if self.eviction_strategy != EvictionStrategy::Dynamic {
return;
}
if let Ok(mut recent) = self.recently_accessed.lock() {
recent.clear();
for key in keys {
if self.cache.contains_key(key) {
recent.insert(*key);
}
}
}
}
pub fn evict_if_needed(&self, current_height: u64) -> usize {
if self.eviction_strategy != EvictionStrategy::Dynamic {
return 0;
}
if self.max_entries_effective() == usize::MAX {
return 0;
}
let len = self.cache.len();
let trigger = self.max_entries_effective() + self.max_entries_effective() / 10;
if len <= trigger {
return 0;
}
let target = self.max_entries_effective() * 9 / 10;
let to_evict = len.saturating_sub(target);
if to_evict == 0 {
return 0;
}
let min_evictable_height = current_height.saturating_sub(EVICT_MIN_AGE_BLOCKS);
let very_old_threshold = current_height.saturating_sub(EVICT_VERY_OLD_BLOCKS);
let mut recent = self.recently_accessed.lock().expect("lock");
let scan_cap = self.eviction_scan_cap(to_evict.saturating_mul(4));
let mut candidates: Vec<(OutPointKey, i64, u64)> = Vec::new();
for r in self.cache.iter() {
if candidates.len() >= scan_cap {
break;
}
let k = *r.key();
if recent.contains(&k) {
continue;
}
if !self.protected_heights.is_empty() {
let v = r.value();
if v.block_height != UNPROTECTED_HEIGHT
&& self.protected_heights.contains(&v.block_height)
{
continue;
}
}
let utxo = r.value().utxo.as_ref();
if utxo.height > min_evictable_height {
continue;
}
candidates.push((k, utxo.value, utxo.height));
}
candidates.sort_by(|a, b| {
let very_old_a = a.2 < very_old_threshold;
let very_old_b = b.2 < very_old_threshold;
match (very_old_a, very_old_b) {
(true, false) => std::cmp::Ordering::Less,
(false, true) => std::cmp::Ordering::Greater,
_ => (a.1, a.2).cmp(&(b.1, b.2)),
}
});
let mut evicted = 0;
for (key, _, _) in candidates.into_iter().take(to_evict) {
if self.cache.remove(&key).is_some() {
evicted += 1;
self.stats_evictions.fetch_add(1, Ordering::Relaxed);
}
}
recent.clear();
if evicted > 0 {
debug!(
"IbdUtxoStore: evicted {} entries (dynamic, cache was over limit)",
evicted
);
}
evicted
}
pub fn evict_aggressive_for_rss(&self) {
let len = self.cache.len();
if len == 0 {
return;
}
let keep = self.max_entries_effective() / 8;
let to_evict = len.saturating_sub(keep);
if to_evict == 0 {
return;
}
let evicted_before = self.stats_evictions.load(Ordering::Relaxed);
let mut remaining = to_evict;
self.cache.retain(|_k, v| {
if remaining == 0 {
return true;
}
if v.block_height != UNPROTECTED_HEIGHT
&& self.protected_heights.contains(&v.block_height)
{
return true;
}
remaining -= 1;
self.stats_evictions.fetch_add(1, Ordering::Relaxed);
false
});
let evicted = self.stats_evictions.load(Ordering::Relaxed) - evicted_before;
if evicted > 0 {
tracing::warn!(
"IbdUtxoStore: EMERGENCY evict {} of {} entries (keep {}, protected_heights={})",
evicted,
len,
keep,
self.protected_heights.len(),
);
}
}
pub fn bootstrap_genesis(&self, genesis_block: &blvm_protocol::types::Block) {
if genesis_block.transactions.is_empty() {
return;
}
let tx_ids = compute_block_tx_ids(genesis_block);
if tx_ids.is_empty() {
return;
}
let coinbase = &genesis_block.transactions[0];
if !is_coinbase(coinbase) || coinbase.outputs.is_empty() {
return;
}
let outpoint = OutPoint {
hash: tx_ids[0],
index: 0,
};
let output = &coinbase.outputs[0];
let utxo = UTXO {
value: output.value,
script_pubkey: output.script_pubkey.as_slice().into(),
height: 0,
is_coinbase: true,
};
let key = outpoint_to_key(&outpoint);
if self.cache.get(&key).is_none() {
self.cache_put(key, Arc::new(utxo), UNPROTECTED_HEIGHT);
self.total_utxo_count.fetch_add(1, Ordering::Relaxed);
self.maybe_evict_tl();
}
}
#[inline]
pub fn get(&self, key: &OutPointKey) -> Option<UTXO> {
let r = self.cache.get(key);
if r.is_some() {
self.stats_cache_hits.fetch_add(1, Ordering::Relaxed);
}
r.map(|r| (*r.utxo).clone())
}
#[inline]
pub fn insert(&self, key: OutPointKey, utxo: UTXO) {
self.cache_put(key, Arc::new(utxo), UNPROTECTED_HEIGHT);
self.maybe_evict_tl();
}
#[inline]
pub fn remove(&self, key: &OutPointKey) {
self.cache.remove(key);
}
#[inline]
pub fn cache_get(
&self,
key: &OutPointKey,
) -> Option<dashmap::mapref::one::Ref<'_, OutPointKey, UtxoCacheSlot>> {
self.cache.get(key)
}
#[inline]
pub fn cache_insert_and_track(&self, key: OutPointKey, arc: Arc<UTXO>) {
self.cache_put(key, arc, UNPROTECTED_HEIGHT);
self.maybe_evict_tl();
}
pub fn cache_insert_and_track_batch(&self, pairs: &[(OutPointKey, Arc<UTXO>)]) {
if pairs.is_empty() {
return;
}
for &(key, ref arc) in pairs {
self.cache_put(key, Arc::clone(arc), UNPROTECTED_HEIGHT);
}
self.maybe_evict_tl();
}
pub fn build_utxo_map(&self, keys: &[OutPointKey]) -> UtxoSet {
let mut map = UtxoSet::default();
let mut buf = Vec::new();
self.supplement_utxo_map_with_buf(&mut map, keys, &mut buf);
map
}
#[inline]
pub fn build_utxo_map_into(&self, keys: &[OutPointKey], map: &mut UtxoSet) {
map.clear();
let mut buf = Vec::new();
self.supplement_utxo_map_with_buf(map, keys, &mut buf);
}
pub fn build_utxo_map_into_with_buf(
&self,
keys: &[OutPointKey],
map: &mut UtxoSet,
cache_misses_buf: &mut Vec<OutPointKey>,
) {
map.clear();
self.supplement_utxo_map_with_buf(map, keys, cache_misses_buf);
}
#[cfg(feature = "production")]
pub fn build_utxo_map_parallel(
&self,
keys: &[OutPointKey],
map: &mut UtxoSet,
cache_misses_buf: &mut Vec<OutPointKey>,
) {
use blvm_protocol::rayon::prelude::*;
const PAR_THRESHOLD: usize = 32;
if keys.len() <= PAR_THRESHOLD {
map.clear();
return self.supplement_utxo_map_with_buf(map, keys, cache_misses_buf);
}
let (hits, misses): (Vec<_>, Vec<_>) = keys
.par_iter()
.map(|key| {
if let Some(ref r) = self.cache.get(key) {
self.stats_cache_hits.fetch_add(1, Ordering::Relaxed);
(Some((key_to_outpoint(key), Arc::clone(&r.utxo))), None)
} else {
(None, Some(*key))
}
})
.unzip();
map.clear();
map.reserve(keys.len());
for opt in hits.into_iter().flatten() {
map.insert(opt.0, opt.1);
}
let disk_keys: Vec<OutPointKey> = misses.into_iter().flatten().collect();
if !disk_keys.is_empty() {
*cache_misses_buf = disk_keys;
let keys_to_supplement: Vec<OutPointKey> = std::mem::take(cache_misses_buf);
let dummy_buf = cache_misses_buf; self.supplement_utxo_map_with_buf(map, &keys_to_supplement, dummy_buf);
}
}
pub fn supplement_utxo_map_with_buf(
&self,
map: &mut UtxoSet,
keys: &[OutPointKey],
cache_misses_buf: &mut Vec<OutPointKey>,
) {
cache_misses_buf.clear();
for key in keys {
let op = key_to_outpoint(key);
if map.contains_key(&op) {
continue;
}
if let Some(ref r) = self.cache.get(key) {
self.stats_cache_hits.fetch_add(1, Ordering::Relaxed);
map.insert(op, Arc::clone(&r.utxo));
continue;
}
cache_misses_buf.push(*key);
}
if !cache_misses_buf.is_empty() && !self.memory_only {
if self.max_entries_effective() != usize::MAX && !self.in_flight_insertions.is_empty() {
cache_misses_buf.retain(|key| {
let op = key_to_outpoint(key);
if map.contains_key(&op) {
return false;
}
if let Some(arc) = self.in_flight_insertions.get(key) {
map.insert(op, Arc::clone(arc.value()));
self.stats_pending_hits.fetch_add(1, Ordering::Relaxed);
return false;
}
true
});
}
let to_load = std::mem::take(cache_misses_buf);
let load_count = to_load.len();
if load_count == 0 {
return;
}
if let Ok((loaded, keys_scanned)) = load_keys_from_disk(Arc::clone(&self.disk), to_load)
{
self.stats_disk_loads
.fetch_add(load_count as u64, Ordering::Relaxed);
let skip_recache = self.skip_recache_disk_hits();
if skip_recache {
for (key, utxo) in loaded {
let arc = Arc::new(utxo);
map.insert(key_to_outpoint(&key), Arc::clone(&arc));
}
} else {
let mut pairs: Vec<(OutPointKey, Arc<UTXO>)> = Vec::with_capacity(loaded.len());
for (key, utxo) in loaded {
let arc = Arc::new(utxo);
map.insert(key_to_outpoint(&key), Arc::clone(&arc));
pairs.push((key, arc));
}
if !pairs.is_empty() {
self.cache_insert_and_track_batch(&pairs);
}
}
if self.max_entries_effective() != usize::MAX
&& !self.in_flight_insertions.is_empty()
{
for key in &keys_scanned {
let op = key_to_outpoint(key);
if map.contains_key(&op) {
continue;
}
if let Some(arc) = self.in_flight_insertions.get(key) {
map.insert(op, Arc::clone(arc.value()));
self.stats_pending_hits.fetch_add(1, Ordering::Relaxed);
}
}
}
for key in &keys_scanned {
let op = key_to_outpoint(key);
if !map.contains_key(&op) {
let in_cache = self.cache.get(key).is_some();
let in_inflight = self.max_entries_effective() != usize::MAX
&& self.in_flight_insertions.contains_key(key);
tracing::error!(
"[UTXO_TOTAL_MISS] key={} in_cache={} in_inflight={} protected_len={} pending_len={} cache_len={}",
hex::encode(key),
in_cache,
in_inflight,
self.protected_heights.len(),
self.pending_log_size.load(Ordering::Relaxed),
self.cache.len(),
);
}
}
}
}
}
pub(crate) fn maybe_evict_tl(&self) {
thread_local! {
static TL_EVICT_SCRATCH: std::cell::RefCell<Vec<(OutPointKey, u64)>> =
const { std::cell::RefCell::new(Vec::new()) };
}
TL_EVICT_SCRATCH.with(|cell| {
self.maybe_evict(&mut cell.borrow_mut());
});
}
pub fn worker_cache_put_protected(
&self,
additions: &rustc_hash::FxHashMap<blvm_protocol::OutPoint, Arc<UTXO>>,
height: u64,
) {
if additions.is_empty() {
return;
}
let h = height as u32;
self.protected_heights.insert(h);
for (op, arc) in additions.iter() {
let key = outpoint_to_key(op);
self.cache_put(key, Arc::clone(arc), h);
}
}
pub fn apply_sync_batch(&self, batch: &SyncBatch, block_height: u64) {
self.total_utxo_count
.fetch_add(batch.total_delta, Ordering::Relaxed);
for key in &batch.deletes {
self.remove(key);
}
let h = block_height as u32;
if !batch.inserts.is_empty() {
self.protected_heights.insert(h);
}
for (key, value) in &batch.inserts {
self.cache_put(*key, Arc::clone(value), h);
if self.eviction_strategy == EvictionStrategy::Dynamic {
if let Ok(mut recent) = self.recently_accessed.lock() {
recent.insert(*key);
}
}
}
let total = batch.deletes.len() + batch.inserts.len();
if self.max_entries_effective() != usize::MAX && !batch.inserts.is_empty() {
for (key, arc) in &batch.inserts {
self.in_flight_insertions
.entry(*key)
.or_insert_with(|| Arc::clone(arc));
}
}
self.push_to_pending_shards(
batch
.deletes
.iter()
.map(|k| (*k, None))
.chain(batch.inserts.iter().map(|(k, v)| (*k, Some(Arc::clone(v))))),
block_height,
);
let _ = total;
self.maybe_evict_tl();
}
pub fn apply_utxo_delta(
&self,
delta: &blvm_protocol::block::UtxoDelta,
block_height: u64,
del_scratch: &mut Vec<OutPointKey>,
add_scratch: &mut Vec<(OutPointKey, Arc<UTXO>)>,
additions_already_in_cache: bool,
) {
let total_delta = delta.additions.len() as isize - delta.deletions.len() as isize;
self.total_utxo_count
.fetch_add(total_delta, Ordering::Relaxed);
let dynamic = self.eviction_strategy == EvictionStrategy::Dynamic;
del_scratch.clear();
del_scratch.reserve(delta.deletions.len());
for dk in &delta.deletions {
let key = consensus_deletion_key_to_store_key(dk);
self.remove(&key);
del_scratch.push(key);
}
add_scratch.clear();
add_scratch.reserve(delta.additions.len());
if additions_already_in_cache {
for (op, arc) in delta.additions.iter() {
let key = outpoint_to_key(op);
add_scratch.push((key, Arc::clone(arc)));
}
} else {
let h = block_height as u32;
if !delta.additions.is_empty() {
self.protected_heights.insert(h);
}
for (op, arc) in delta.additions.iter() {
let key = outpoint_to_key(op);
self.cache_put(key, Arc::clone(arc), h);
add_scratch.push((key, Arc::clone(arc)));
}
}
if self.max_entries_effective() != usize::MAX && !add_scratch.is_empty() {
for (key, arc) in add_scratch.iter() {
self.in_flight_insertions
.entry(*key)
.or_insert_with(|| Arc::clone(arc));
}
}
self.push_to_pending_shards(
del_scratch.iter().map(|&k| (k, None)).chain(
add_scratch
.iter()
.map(|(k, arc)| (*k, Some(Arc::clone(arc)))),
),
block_height,
);
if dynamic {
if let Ok(mut recent) = self.recently_accessed.lock() {
recent.reserve(delta.additions.len());
for op in delta.additions.keys() {
let key = outpoint_to_key(op);
recent.insert(key);
}
}
}
}
fn push_to_pending_shards<I>(&self, items: I, block_height: u64)
where
I: IntoIterator<Item = (OutPointKey, PendingValue)>,
{
let mut buckets: [Vec<PendingLogEntry>; PENDING_SHARDS] = Default::default();
let mut total = 0usize;
for (key, val) in items {
let s = pending_shard_idx(&key);
buckets[s].push((key, val, block_height));
total += 1;
}
if total == 0 {
return;
}
for (i, bucket) in buckets.iter_mut().enumerate() {
if bucket.is_empty() {
continue;
}
let mut shard = self.pending_shards[i].lock().expect("pending shard lock");
shard.append(bucket);
}
self.pending_log_size.fetch_add(total, Ordering::Relaxed);
}
fn drain_all_pending_shards(&self) -> Vec<PendingLogEntry> {
let approx = self.pending_log_size.load(Ordering::Relaxed);
let mut all = Vec::with_capacity(approx);
for shard in self.pending_shards.iter() {
let mut s = shard.lock().expect("pending shard lock");
if !s.is_empty() {
all.append(&mut *s);
}
}
let taken = all.len();
let prev = self.pending_log_size.load(Ordering::Relaxed);
self.pending_log_size
.store(prev.saturating_sub(taken), Ordering::Relaxed);
all
}
fn drain_pending_through_height(
&self,
max_block_height_inclusive: u64,
) -> Vec<PendingLogEntry> {
let approx = self.pending_log_size.load(Ordering::Relaxed);
let mut all = Vec::with_capacity(approx.min(65536));
let mut drained = 0usize;
for shard in self.pending_shards.iter() {
let mut s = shard.lock().expect("pending shard lock");
if s.is_empty() {
continue;
}
let mut i = 0;
while i < s.len() {
if s[i].2 <= max_block_height_inclusive {
all.push(s.swap_remove(i));
drained += 1;
} else {
i += 1;
}
}
}
let prev = self.pending_log_size.load(Ordering::Relaxed);
self.pending_log_size
.store(prev.saturating_sub(drained), Ordering::Relaxed);
all
}
fn all_pending_shards_empty(&self) -> bool {
if self.pending_log_size.load(Ordering::Relaxed) > 0 {
return false;
}
true
}
fn register_in_flight(&self, pkg: &PendingFlushPackage) {
if self.max_entries_effective() == usize::MAX {
return; }
for (key, value_opt) in pkg.ops.iter() {
if let Some(arc) = value_opt {
self.in_flight_insertions
.entry(*key)
.or_insert_with(|| Arc::clone(arc));
}
}
}
pub fn maybe_take_flush_batch(&self) -> Option<PendingFlushPackage> {
self.maybe_take_flush_batch_through(u64::MAX)
}
pub fn maybe_take_flush_batch_through(
&self,
max_block_height_inclusive: u64,
) -> Option<PendingFlushPackage> {
let secondary = if self.max_entries_effective() == usize::MAX {
usize::MAX
} else {
(self.max_entries_effective() * 20 / 100).max(1)
};
let n = self.pending_log_size.load(Ordering::Relaxed);
if n < self.flush_threshold && n < secondary {
return None;
}
let raw = self.drain_pending_through_height(max_block_height_inclusive);
let pkg = pack_flush_package(raw)?;
self.register_in_flight(&pkg);
Some(pkg)
}
pub fn take_flush_batch_force(&self) -> Option<PendingFlushPackage> {
self.take_flush_batch_force_through(u64::MAX)
}
pub fn take_flush_batch_force_through(
&self,
max_block_height_inclusive: u64,
) -> Option<PendingFlushPackage> {
if self.all_pending_shards_empty() {
return None;
}
let raw = self.drain_pending_through_height(max_block_height_inclusive);
let pkg = pack_flush_package(raw)?;
self.register_in_flight(&pkg);
Some(pkg)
}
pub fn take_remaining_flush_package(&self) -> Option<PendingFlushPackage> {
if self.all_pending_shards_empty() {
return None;
}
let raw = self.drain_all_pending_shards();
let pkg = pack_flush_package(raw)?;
self.register_in_flight(&pkg);
Some(pkg)
}
pub fn flush_pending_batch(&self, batch: &[(OutPointKey, PendingValue)]) -> Result<usize> {
if batch.is_empty() {
return Ok(0);
}
let mut total_flushed = 0;
let mut ser_buf = Vec::with_capacity(192);
for chunk in batch.chunks(MAX_BATCH_OPS) {
let mut b = self.disk.batch()?;
for (key, value_opt) in chunk {
match value_opt {
Some(arc) => {
ser_buf.clear();
bincode::serialize_into(&mut ser_buf, arc.as_ref())
.map_err(|e| anyhow::anyhow!("UTXO serialize: {}", e))?;
b.put(key.as_slice(), ser_buf.as_slice());
}
None => b.delete(key.as_slice()),
}
}
b.commit_no_wal()?;
total_flushed += chunk.len();
}
debug!("IbdUtxoStore: flushed {} operations to disk", total_flushed);
if self.max_entries_effective() != usize::MAX {
for (key, _value_opt) in batch {
self.in_flight_insertions.remove(key);
}
}
if self.max_entries_effective() != usize::MAX
&& self.cache.len() > self.max_entries_effective()
{
let mut evicted = 0;
for (key, value_opt) in batch {
if value_opt.is_some() {
if self.cache.remove(key).is_some() {
evicted += 1;
}
if self.cache.len() <= self.max_entries_effective() {
break;
}
}
}
if evicted > 0 {
debug!(
"IbdUtxoStore: evicted {} flushed entries (cache over limit)",
evicted
);
}
}
Ok(total_flushed)
}
pub fn flush_prepared_package(
&self,
pkg: &PreparedFlushPackage,
mut muhash: Option<&mut MuHash3072>,
) -> Result<usize> {
let mut total_flushed = 0;
let slab = pkg.slab.as_slice();
for chunk in pkg.rows.chunks(MAX_BATCH_OPS) {
if chunk.is_empty() {
continue;
}
if ibd_per_op_muhash_enabled() {
if let Some(mhref) = muhash.as_mut() {
for (key, value_opt) in chunk {
match value_opt {
Some((start, len)) => {
let utxo: UTXO =
bincode::deserialize(&slab[*start as usize..][..*len as usize])
.map_err(|e| {
anyhow::anyhow!("UTXO deserialize for muhash: {}", e)
})?;
let op = key_to_outpoint(key);
let pre = utxo_muhash_preimage_ibd(&op, &utxo);
mhref.insert_mut(&pre);
}
None => {
let Some(disk_bytes) = self.disk.get(key.as_slice())? else {
debug!(
"IbdUtxoStore: MuHash skip delete (no SST row; net-no-op vs durable set), key_prefix={}",
hex::encode(&key[..8])
);
continue;
};
let utxo: UTXO =
bincode::deserialize(&disk_bytes).map_err(|e| {
anyhow::anyhow!("disk UTXO deserialize for muhash: {}", e)
})?;
let op = key_to_outpoint(key);
let pre = utxo_muhash_preimage_ibd(&op, &utxo);
mhref.remove_mut(&pre);
}
}
}
}
}
let mut b = self.disk.batch()?;
for (key, value_opt) in chunk {
match value_opt {
Some((start, len)) => {
b.put(key.as_slice(), &slab[*start as usize..][..*len as usize])
}
None => b.delete(key.as_slice()),
}
}
b.commit_no_wal()?;
total_flushed += chunk.len();
}
if total_flushed == 0 {
return Ok(0);
}
debug!(
"IbdUtxoStore: flushed {} prepared operations to disk",
total_flushed
);
if self.max_entries_effective() != usize::MAX {
for (key, _value_opt) in pkg.rows.iter() {
self.in_flight_insertions.remove(key);
}
}
if self.max_entries_effective() != usize::MAX
&& self.cache.len() > self.max_entries_effective()
{
let mut evicted = 0;
for (key, value_opt) in pkg.rows.iter() {
if value_opt.is_some() {
if self.cache.remove(key).is_some() {
evicted += 1;
}
if self.cache.len() <= self.max_entries_effective() {
break;
}
}
}
if evicted > 0 {
debug!(
"IbdUtxoStore: evicted {} flushed entries (cache over limit)",
evicted
);
}
}
Ok(total_flushed)
}
pub fn len(&self) -> usize {
self.cache.len()
}
pub fn is_empty(&self) -> bool {
self.cache.is_empty()
}
pub fn to_utxo_set_snapshot(&self) -> UtxoSet {
self.cache
.iter()
.map(|r| {
let key = r.key();
let slot = r.value();
(key_to_outpoint(key), Arc::clone(&slot.utxo))
})
.collect()
}
pub fn total_count(&self) -> isize {
self.total_utxo_count.load(Ordering::Relaxed)
}
pub fn disk_clone(&self) -> Arc<dyn Tree> {
Arc::clone(&self.disk)
}
pub fn flush_disk(&self) -> Result<()> {
self.disk.flush_to_disk()
}
pub fn stats(&self) -> (u64, u64, u64, u64) {
(
self.stats_disk_loads.load(Ordering::Relaxed),
self.stats_cache_hits.load(Ordering::Relaxed),
self.stats_evictions.load(Ordering::Relaxed),
self.stats_pending_hits.load(Ordering::Relaxed),
)
}
}