use super::disk_index::DiskIndex;
use super::memory_age::{MemoryAge, Pin};
use super::memory_run::MemoryRun;
use super::types::{OutputId, OutputKV};
use std::path::Path;
use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering};
use std::sync::Arc;
#[cfg(test)]
extern crate tempfile;
const K_AGES: usize = 7;
const K_MUTABLE_AGES: usize = 3;
const K_FAN_IN: usize = 8;
const K_COMPACTER_THREADS: usize = 7;
fn choose_eviction_age(avail_mb: u64) -> usize {
if let Ok(s) = std::env::var("BLVM_IBD_ENGINE_EVICTION_AGE") {
if let Ok(n) = s.trim().parse::<usize>() {
let clamped = n.clamp(K_MUTABLE_AGES, K_AGES);
tracing::info!(
"UTXO engine: eviction age = {} (from BLVM_IBD_ENGINE_EVICTION_AGE)",
clamped
);
return clamped;
}
}
let total_mb = proc_mem_total_mb().unwrap_or(avail_mb);
let age = if avail_mb >= 65 * 1024 {
5
} else if avail_mb >= 20 * 1024 {
4
} else {
3
};
tracing::info!(
"UTXO engine: eviction age = {} (auto-detected: {:.1} GiB physical RAM, {:.1} GiB available)",
age,
total_mb as f64 / 1024.0,
avail_mb as f64 / 1024.0,
);
age
}
fn proc_mem_total_mb() -> Option<u64> {
let content = std::fs::read_to_string("/proc/meminfo").ok()?;
for line in content.lines() {
if line.starts_with("MemTotal:") {
let kb: u64 = line.split_whitespace().nth(1)?.parse().ok()?;
return Some(kb / 1024);
}
}
None
}
struct Compacter {
tx: crossbeam_channel::Sender<usize>,
eviction_age_live: Arc<AtomicUsize>,
_threads: Vec<std::thread::JoinHandle<()>>,
}
impl Compacter {
fn start(
ages: Arc<[MemoryAge; K_AGES]>,
disk_index: Arc<DiskIndex>,
boot_eviction_age: usize,
) -> Self {
let eviction_age_live = Arc::new(AtomicUsize::new(boot_eviction_age));
let (tx, rx) = crossbeam_channel::unbounded::<usize>();
let mut threads = Vec::with_capacity(K_COMPACTER_THREADS);
for _ in 0..K_COMPACTER_THREADS {
let rx = rx.clone();
let tx = tx.clone();
let ages = Arc::clone(&ages);
let disk_index = Arc::clone(&disk_index);
let eviction_age_live = Arc::clone(&eviction_age_live);
let handle = std::thread::Builder::new()
.name("utxo-compacter".to_string())
.spawn(move || {
while let Ok(age_idx) = rx.recv() {
if age_idx == usize::MAX {
break; }
let eviction_age = eviction_age_live.load(Ordering::Acquire);
run_merge_for_age(&ages, age_idx, &disk_index, &tx, eviction_age);
}
})
.expect("spawn compacter thread");
threads.push(handle);
}
Self {
tx,
eviction_age_live,
_threads: threads,
}
}
fn enqueue(&self, age_idx: usize) {
let _ = self.tx.try_send(age_idx);
}
fn shutdown(&self) {
for _ in 0..K_COMPACTER_THREADS {
let _ = self.tx.send(usize::MAX);
}
}
}
fn run_merge_for_age(
ages: &[MemoryAge; K_AGES],
age_idx: usize,
disk_index: &DiskIndex,
tx: &crossbeam_channel::Sender<usize>,
eviction_age: usize,
) {
let age = &ages[age_idx];
let Some(runs) = age.take_for_merge() else {
return;
};
let merged = MemoryRun::merge(&runs);
let max_h = runs
.iter()
.map(|r| r.height_range().1)
.max()
.unwrap_or(i32::MIN);
if !merged.is_empty() {
let next_idx = age_idx + 1;
if next_idx < eviction_age {
ages[next_idx].push_frozen_run(Arc::new(merged));
if ages[next_idx].merge_ready() {
let _ = tx.send(next_idx);
}
} else {
tracing::debug!(
"UTXO engine: evicting {} entries to disk (age {} overflow)",
merged.len(),
age_idx,
);
if let Err(e) = disk_index.push_run(&merged) {
tracing::error!("UTXO engine: disk eviction failed — data may be lost: {e}");
}
}
}
age.complete_merge(max_h, &runs);
}
pub struct UtxoIndex {
ages: Arc<[MemoryAge; K_AGES]>,
compacter: Compacter,
disk_index: Arc<DiskIndex>,
contiguous_length: AtomicI32,
boot_eviction_age: usize,
}
impl UtxoIndex {
pub fn open(seg_dir: &Path, avail_mb: u64) -> anyhow::Result<Self> {
let eviction_age = choose_eviction_age(avail_mb);
let ages_raw: [MemoryAge; K_AGES] = std::array::from_fn(|i| {
let is_mutable = i < K_MUTABLE_AGES;
MemoryAge::new(is_mutable, K_FAN_IN)
});
let ages = Arc::new(ages_raw);
let disk_index = Arc::new(DiskIndex::new(seg_dir)?);
let compacter = Compacter::start(Arc::clone(&ages), Arc::clone(&disk_index), eviction_age);
Ok(Self {
ages,
compacter,
disk_index,
contiguous_length: AtomicI32::new(-1),
boot_eviction_age: eviction_age,
})
}
pub fn memory_pressure_tick(&self, level_u8: u8) {
let boot = self.boot_eviction_age;
let target = match level_u8 {
3 => K_MUTABLE_AGES, 2 => boot.min(K_MUTABLE_AGES + 1), 1 => boot.min(K_MUTABLE_AGES + 2), _ => boot, };
let live = &self.compacter.eviction_age_live;
let prev = live.load(Ordering::Relaxed);
if target != prev {
live.store(target, Ordering::Release);
if target < prev {
tracing::warn!(
"UTXO engine: memory pressure level {} — eviction age {} → {} (spilling index to disk)",
level_u8, prev, target
);
} else if level_u8 == 0 {
tracing::info!(
"UTXO engine: memory pressure cleared — eviction age restored to {}",
target
);
}
}
if level_u8 >= 2 {
for i in 0..K_AGES {
if self.ages[i].merge_ready() {
self.compacter.enqueue(i);
}
}
}
if level_u8 >= 3 {
#[cfg(all(not(target_os = "windows"), feature = "mimalloc"))]
unsafe {
libmimalloc_sys::mi_collect(true);
}
#[cfg(target_os = "linux")]
unsafe {
libc::malloc_trim(0);
}
}
}
#[cfg(test)]
pub fn new_for_test() -> Self {
let tmp = tempfile::tempdir().expect("tempdir");
let idx = Self::open(tmp.path(), 8 * 1024).expect("UtxoIndex::open"); std::mem::forget(tmp);
idx
}
pub fn append(&self, entries: Vec<OutputKV>, height: i32) -> Pin {
let pin = self.ages[0].pin_height(height);
self.ages[0].append(entries, height);
self.contiguous_length.fetch_max(height, Ordering::Relaxed);
for i in 0..K_MUTABLE_AGES {
if self.ages[i].merge_ready() {
self.compacter.enqueue(i);
}
}
pin
}
pub fn lookup_key(&self, key: &[u8; 36]) -> Option<OutputId> {
for age in self.ages.iter() {
if let Some(id) = age.lookup_key(key, 0, i32::MAX) {
if id == super::types::OUTPUT_ID_DELETED {
return None;
}
return Some(id);
}
}
None
}
pub fn batch_query(&self, keys: &[[u8; 36]], ids: &mut [OutputId], before: i32) {
debug_assert_eq!(keys.len(), ids.len());
for age in self.ages.iter() {
if ids.iter().all(|id| *id != OutputId::MAX) {
break; }
age.batch_query(keys, ids, 0, before);
}
self.disk_index.batch_query(keys, ids, before);
}
pub fn wait_for_height(&self, height: i32) {
while self.contiguous_length.load(Ordering::Relaxed) < height {
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
pub fn contiguous_length(&self) -> i32 {
self.contiguous_length.load(Ordering::Relaxed)
}
pub fn seed_checkpoint(&self, mut entries: Vec<OutputKV>, checkpoint_height: i32) {
if !entries.is_empty() {
entries.sort_unstable();
match self.disk_index.push_sorted_segment(&entries) {
Ok(()) => {
drop(entries);
}
Err(e) => {
tracing::error!(
"seed_checkpoint: disk write failed ({e:#}), falling back to age-0 (may cause memory pressure)"
);
self.ages[0].append(entries, checkpoint_height);
}
}
}
self.contiguous_length
.store(checkpoint_height, Ordering::Release);
super::set_gc_fence(checkpoint_height);
}
pub fn alloc_seed_seg(&self) -> (usize, std::path::PathBuf) {
self.disk_index.alloc_seg()
}
pub fn finalize_seed(&self, seg: super::disk_segment::DiskSegment, checkpoint_height: i32) {
self.disk_index.register_seg(seg);
self.contiguous_length
.store(checkpoint_height, Ordering::Release);
super::set_gc_fence(checkpoint_height);
}
pub fn erase_since(&self, since: i32) {
for i in 0..K_MUTABLE_AGES {
self.ages[i].erase_since(since);
}
self.contiguous_length
.fetch_min(since - 1, Ordering::Relaxed);
}
pub fn scan_all_live(&self) -> Vec<OutputKV> {
let (disk_segs, mem_snapshots) = {
let guard = self.disk_index.segments.read();
let disk = guard.clone();
let mem: Vec<_> = self.ages.iter().rev().map(|a| a.snapshot_runs()).collect();
(disk, mem)
};
let mut all_entries: Vec<OutputKV> = Vec::new();
for seg in &disk_segs {
let entries = match seg.read_all_entries() {
Ok(e) => e,
Err(err) => {
tracing::warn!("scan_all_live: skipping segment {:?}: {err}", seg.path);
continue;
}
};
all_entries.extend_from_slice(&entries);
}
for snapshot in &mem_snapshots {
for run in snapshot.iter() {
all_entries.extend_from_slice(&run.entries);
}
}
all_entries.sort_unstable();
let mut result: Vec<OutputKV> = Vec::new();
let mut i = 0;
while i < all_entries.len() {
let first = all_entries[i];
if first.is_add() {
result.push(first);
}
let key = first.key;
i += 1;
while i < all_entries.len() && all_entries[i].key == key {
i += 1;
}
}
result
}
pub fn scan_live_at_height(&self, max_height: i32) -> Vec<OutputKV> {
self.disk_index.compact_for_checkpoint_sync();
let (disk_segs, mem_snapshots) = {
let guard = self.disk_index.segments.read();
let disk = guard.clone();
let mem: Vec<_> = self.ages.iter().rev().map(|a| a.snapshot_runs()).collect();
(disk, mem)
};
let mut all_entries: Vec<OutputKV> = Vec::new();
for seg in &disk_segs {
let mut reader = seg.stream();
loop {
match reader.advance() {
Ok(Some(entry)) => {
if entry.height <= max_height {
all_entries.push(entry);
}
}
Ok(None) => break,
Err(err) => {
tracing::warn!(
"scan_live_at_height: read error on segment {:?}: {err}",
seg.path
);
break;
}
}
}
}
for snapshot in &mem_snapshots {
for run in snapshot.iter() {
for entry in &run.entries {
if entry.height <= max_height {
all_entries.push(*entry);
}
}
}
}
all_entries.sort_unstable();
let mut result: Vec<OutputKV> = Vec::new();
let mut i = 0;
while i < all_entries.len() {
let first = all_entries[i];
if first.is_add() {
result.push(first);
}
let key = first.key;
i += 1;
while i < all_entries.len() && all_entries[i].key == key {
i += 1;
}
}
result
}
pub fn iter_live_at_height(&self, max_height: i32) -> anyhow::Result<CheckpointStream> {
self.disk_index.compact_for_checkpoint_sync();
let (disk_segs, mem_snapshots) = {
let guard = self.disk_index.segments.read();
let disk = guard.clone();
let mem: Vec<_> = self.ages.iter().rev().map(|a| a.snapshot_runs()).collect();
(disk, mem)
};
let mut mem_entries: Vec<OutputKV> = Vec::new();
for snapshot in &mem_snapshots {
for run in snapshot.iter() {
for &entry in &run.entries {
if entry.height <= max_height {
mem_entries.push(entry);
}
}
}
}
mem_entries.sort_unstable();
let mut readers: Vec<super::disk_segment::SegmentReader> =
disk_segs.iter().map(|seg| seg.stream()).collect();
let mut heads: Vec<Option<OutputKV>> = Vec::with_capacity(readers.len());
for reader in &mut readers {
heads.push(reader.advance()?);
}
Ok(CheckpointStream {
mem: mem_entries,
mem_pos: 0,
readers,
heads,
max_height,
last_key: None,
})
}
}
pub struct CheckpointStream {
mem: Vec<OutputKV>,
mem_pos: usize,
readers: Vec<super::disk_segment::SegmentReader>,
heads: Vec<Option<OutputKV>>,
max_height: i32,
last_key: Option<[u8; 36]>,
}
impl CheckpointStream {
pub fn next_live(&mut self) -> anyhow::Result<Option<OutputKV>> {
loop {
let entry = match self.pick_min()? {
Some(e) => e,
None => return Ok(None),
};
if entry.height > self.max_height {
continue;
}
if Some(entry.key) == self.last_key {
continue;
}
self.last_key = Some(entry.key);
if entry.is_add() {
return Ok(Some(entry));
}
}
}
fn pick_min(&mut self) -> anyhow::Result<Option<OutputKV>> {
let mut best: Option<OutputKV> = None;
let mut best_is_disk = false;
let mut best_disk_idx: usize = 0;
if let Some(&me) = self.mem.get(self.mem_pos) {
best = Some(me);
}
for i in 0..self.heads.len() {
if let Some(de) = self.heads[i] {
let take = match best {
None => true,
Some(cur) => de < cur,
};
if take {
best = Some(de);
best_is_disk = true;
best_disk_idx = i;
}
}
}
match best {
None => Ok(None),
Some(e) => {
if best_is_disk {
self.heads[best_disk_idx] = self.readers[best_disk_idx].advance()?;
} else {
self.mem_pos += 1;
}
Ok(Some(e))
}
}
}
}
impl Drop for UtxoIndex {
fn drop(&mut self) {
self.compacter.shutdown();
}
}
#[cfg(test)]
mod tests {
use super::super::types::OutputKV;
use super::*;
fn make_key(n: u8) -> [u8; 36] {
let mut k = [0u8; 36];
k[0] = n;
k
}
#[test]
fn test_append_and_query() {
let idx = UtxoIndex::new_for_test();
let k = make_key(1);
let _pin = idx.append(vec![OutputKV::new_add(k, 100, 42)], 100);
assert_eq!(idx.lookup_key(&k), Some(42));
assert_eq!(idx.lookup_key(&make_key(2)), None);
}
#[test]
fn test_batch_query() {
let idx = UtxoIndex::new_for_test();
let k1 = make_key(1);
let k2 = make_key(2);
let _p1 = idx.append(vec![OutputKV::new_add(k1, 100, 10)], 100);
let _p2 = idx.append(vec![OutputKV::new_add(k2, 101, 20)], 101);
let mut ids = [OutputId::MAX; 2];
idx.batch_query(&[k1, k2], &mut ids, i32::MAX);
assert_eq!(ids[0], 10);
assert_eq!(ids[1], 20);
}
#[test]
fn test_contiguous_length() {
let idx = UtxoIndex::new_for_test();
assert_eq!(idx.contiguous_length(), -1);
let k = make_key(1);
let _pin = idx.append(vec![OutputKV::new_add(k, 50, 1)], 50);
assert_eq!(idx.contiguous_length(), 50);
}
#[test]
fn test_erase_since() {
let idx = UtxoIndex::new_for_test();
let k1 = make_key(1);
let k2 = make_key(2);
let _p1 = idx.append(vec![OutputKV::new_add(k1, 50, 1)], 50);
let _p2 = idx.append(vec![OutputKV::new_add(k2, 100, 2)], 100);
idx.erase_since(75);
assert_eq!(idx.lookup_key(&k1), Some(1));
assert_eq!(idx.lookup_key(&k2), None);
}
}