use super::database::UtxoDatabase;
use super::types::{output_key_to_outpoint, IdCodec, OutputId};
use crate::storage::database::Tree;
use crate::storage::disk_utxo::outpoint_to_key;
use anyhow::Result;
use blvm_muhash::{serialize_coin_for_muhash, MuHash3072};
use blvm_protocol::types::{SharedByteString, UTXO};
#[cfg(target_os = "linux")]
use libc;
#[cfg(all(not(target_os = "windows"), feature = "mimalloc"))]
use libmimalloc_sys;
use std::sync::Arc;
use tracing::{info, warn};
pub const CKPT_TREE_A: &str = "ibd_utxos_ckpt_a";
pub const CKPT_TREE_B: &str = "ibd_utxos_ckpt_b";
pub fn ckpt_tree_for_slot(slot: u8) -> &'static str {
if slot & 1 == 0 {
CKPT_TREE_A
} else {
CKPT_TREE_B
}
}
pub fn ckpt_inactive_slot(active: u8) -> u8 {
1 - (active & 1)
}
pub fn watermark_export(db: &UtxoDatabase, tree: &dyn Tree, tip_height: i32) -> Result<MuHash3072> {
db.wait_for_height(tip_height);
let live_kvs = db.scan_live();
let result = write_live_kvs(db, tree, &live_kvs, tip_height);
drop(live_kvs);
#[cfg(all(not(target_os = "windows"), feature = "mimalloc"))]
unsafe {
libmimalloc_sys::mi_collect(true);
}
#[cfg(target_os = "linux")]
unsafe {
libc::malloc_trim(0);
}
result
}
pub fn run_watermark_export(
db: &UtxoDatabase,
tree: &Arc<dyn Tree>,
tip_height: i32,
) -> Result<MuHash3072> {
let t = std::time::Instant::now();
let muhash = watermark_export(db, tree.as_ref(), tip_height)?;
info!(
"IBD engine watermark export finished in {:.1}s (height={})",
t.elapsed().as_secs_f64(),
tip_height
);
Ok(muhash)
}
pub fn run_checkpoint_export_replace(
db: &UtxoDatabase,
tree: &Arc<dyn Tree>,
checkpoint_height: i32,
) -> Result<(MuHash3072, usize)> {
let t = std::time::Instant::now();
let mut stream = db.iter_live_at_height(checkpoint_height)?;
tree.clear()?;
let (muhash, count) =
write_live_kvs_streaming(db, tree.as_ref(), &mut stream, checkpoint_height)?;
let elapsed = t.elapsed().as_secs_f64();
drop(stream);
#[cfg(all(not(target_os = "windows"), feature = "mimalloc"))]
unsafe {
libmimalloc_sys::mi_collect(true);
}
#[cfg(target_os = "linux")]
unsafe {
libc::malloc_trim(0);
}
info!(
"IBD engine checkpoint export (replace) in {:.1}s (height={}, utxos={})",
elapsed, checkpoint_height, count,
);
Ok((muhash, count))
}
fn write_live_kvs_streaming(
db: &UtxoDatabase,
tree: &dyn Tree,
stream: &mut super::index::CheckpointStream,
tip_height: i32,
) -> Result<(MuHash3072, usize)> {
const CHUNK_SIZE: usize = 500_000;
let mut muhash = MuHash3072::new();
let mut total_written = 0usize;
let mut ser_buf: Vec<u8> = Vec::with_capacity(200);
info!(
"IBD engine checkpoint export (streaming): writing UTXOs at height {}",
tip_height
);
let mut chunk_kvs: Vec<super::types::OutputKV> = Vec::with_capacity(CHUNK_SIZE);
loop {
chunk_kvs.clear();
while chunk_kvs.len() < CHUNK_SIZE {
match stream.next_live()? {
Some(e) => chunk_kvs.push(e),
None => break,
}
}
if chunk_kvs.is_empty() {
break;
}
let chunk_ids: Vec<OutputId> = chunk_kvs.iter().map(|kv| kv.id).collect();
let mut details = Vec::with_capacity(chunk_kvs.len());
let fetched = db.fetch(&chunk_ids, &mut details)?;
if fetched != chunk_kvs.len() {
warn!(
"checkpoint export chunk: fetched {} details but expected {}",
fetched,
chunk_kvs.len()
);
}
let mut kv_pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(chunk_kvs.len());
for (rank, kv) in chunk_kvs.iter().enumerate() {
let Some(detail) = details.get(rank) else {
continue;
};
let op = output_key_to_outpoint(&kv.key);
let rocks_key = outpoint_to_key(&op);
let utxo = UTXO {
value: detail.header.amount,
script_pubkey: SharedByteString::from(detail.script.as_slice()),
height: detail.header.height as u64,
is_coinbase: detail.header.is_coinbase(),
};
let preimage = serialize_coin_for_muhash(
&op.hash,
op.index,
detail.header.height as u32,
detail.header.is_coinbase(),
detail.header.amount,
detail.script.as_slice(),
);
muhash.insert_mut(&preimage);
ser_buf.clear();
bincode::serialize_into(&mut ser_buf, &utxo)
.map_err(|e| anyhow::anyhow!("UTXO serialize: {e}"))?;
kv_pairs.push((rocks_key.to_vec(), ser_buf.clone()));
}
kv_pairs.sort_unstable_by(|a, b| a.0.cmp(&b.0));
total_written += kv_pairs.len();
tree.bulk_load_sorted_kv(&kv_pairs)?;
}
info!(
"IBD engine checkpoint export complete: wrote {} UTXOs",
total_written
);
Ok((muhash, total_written))
}
fn write_live_kvs(
db: &UtxoDatabase,
tree: &dyn Tree,
live_kvs: &[super::types::OutputKV],
tip_height: i32,
) -> Result<MuHash3072> {
let total = live_kvs.len();
info!(
"IBD engine checkpoint export: {} live UTXOs at height {}",
total, tip_height
);
const CHUNK_SIZE: usize = 2_000_000;
let mut muhash = MuHash3072::new();
let mut total_written = 0usize;
let mut ser_buf: Vec<u8> = Vec::with_capacity(200);
for chunk in live_kvs.chunks(CHUNK_SIZE) {
let chunk_ids: Vec<OutputId> = chunk
.iter()
.filter(|kv| kv.id != 0)
.map(|kv| kv.id)
.collect();
let chunk_kvs: Vec<&super::types::OutputKV> =
chunk.iter().filter(|kv| kv.id != 0).collect();
let mut details = Vec::with_capacity(chunk_kvs.len());
let fetched = db.fetch(&chunk_ids, &mut details)?;
if fetched != chunk_kvs.len() {
warn!(
"IBD engine export chunk: fetched {} details but expected {} — engine/table mismatch",
fetched,
chunk_kvs.len()
);
}
let mut kv_pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(chunk_kvs.len());
for (rank, kv) in chunk_kvs.iter().enumerate() {
let Some(detail) = details.get(rank) else {
continue;
};
let op = output_key_to_outpoint(&kv.key);
let rocks_key = outpoint_to_key(&op);
let utxo = UTXO {
value: detail.header.amount,
script_pubkey: SharedByteString::from(detail.script.as_slice()),
height: detail.header.height as u64,
is_coinbase: detail.header.is_coinbase(),
};
let preimage = serialize_coin_for_muhash(
&op.hash,
op.index,
detail.header.height as u32,
detail.header.is_coinbase(),
detail.header.amount,
detail.script.as_slice(),
);
muhash.insert_mut(&preimage);
ser_buf.clear();
bincode::serialize_into(&mut ser_buf, &utxo)
.map_err(|e| anyhow::anyhow!("UTXO serialize: {e}"))?;
kv_pairs.push((rocks_key.to_vec(), ser_buf.clone()));
}
kv_pairs.sort_unstable_by(|a, b| a.0.cmp(&b.0));
total_written += kv_pairs.len();
tree.bulk_load_sorted_kv(&kv_pairs)?;
drop(kv_pairs);
drop(details);
drop(chunk_kvs);
drop(chunk_ids);
}
info!(
"IBD engine checkpoint export complete: wrote {} UTXOs",
total_written
);
Ok(muhash)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::ibd_engine::UtxoDatabase;
use blvm_protocol::{
Block, BlockHeader, OutPoint, Transaction, TransactionInput, TransactionOutput,
};
use std::collections::HashMap;
use tempfile::NamedTempFile;
struct MockTree {
data: std::sync::Mutex<HashMap<Vec<u8>, Vec<u8>>>,
}
impl MockTree {
fn new() -> Self {
Self {
data: std::sync::Mutex::new(HashMap::new()),
}
}
fn get_value(&self, key: &[u8]) -> Option<Vec<u8>> {
self.data.lock().unwrap().get(key).cloned()
}
}
impl Tree for MockTree {
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.data
.lock()
.unwrap()
.insert(key.to_vec(), value.to_vec());
Ok(())
}
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.data.lock().unwrap().get(key).cloned())
}
fn remove(&self, key: &[u8]) -> Result<()> {
self.data.lock().unwrap().remove(key);
Ok(())
}
fn contains_key(&self, key: &[u8]) -> Result<bool> {
Ok(self.data.lock().unwrap().contains_key(key))
}
fn clear(&self) -> Result<()> {
self.data.lock().unwrap().clear();
Ok(())
}
fn len(&self) -> Result<usize> {
Ok(self.data.lock().unwrap().len())
}
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + '_> {
let snapshot: Vec<_> = self
.data
.lock()
.unwrap()
.iter()
.map(|(k, v)| Ok((k.clone(), v.clone())))
.collect();
Box::new(snapshot.into_iter())
}
fn batch(&self) -> Result<Box<dyn crate::storage::database::BatchWriter + '_>> {
Ok(Box::new(MockBatch {
tree: self,
ops: Vec::new(),
}))
}
}
struct MockBatch<'a> {
tree: &'a MockTree,
ops: Vec<(Vec<u8>, Option<Vec<u8>>)>,
}
impl crate::storage::database::BatchWriter for MockBatch<'_> {
fn put(&mut self, key: &[u8], value: &[u8]) {
self.ops.push((key.to_vec(), Some(value.to_vec())));
}
fn delete(&mut self, key: &[u8]) {
self.ops.push((key.to_vec(), None));
}
fn commit(self: Box<Self>) -> Result<()> {
let mut data = self.tree.data.lock().unwrap();
for (k, v_opt) in self.ops {
match v_opt {
Some(v) => {
data.insert(k, v);
}
None => {
data.remove(&k);
}
}
}
Ok(())
}
fn len(&self) -> usize {
self.ops.len()
}
}
fn make_coinbase(value: i64) -> Transaction {
Transaction {
version: 1,
inputs: vec![TransactionInput {
prevout: OutPoint {
hash: [0u8; 32],
index: 0xFFFFFFFF,
},
sequence: 0xFFFFFFFF,
script_sig: vec![],
}]
.into(),
outputs: vec![TransactionOutput {
value,
script_pubkey: vec![0x76, 0xa9, 0x14, 0xde],
}]
.into(),
lock_time: 0,
}
}
fn make_block(txs: Vec<Transaction>) -> Block {
Block {
header: BlockHeader {
version: 1,
prev_block_hash: [0u8; 32],
merkle_root: [0u8; 32],
timestamp: 0,
bits: 0,
nonce: 0,
},
transactions: txs.into_boxed_slice(),
}
}
#[test]
fn test_watermark_export_writes_utxos() {
let tmp = NamedTempFile::new().unwrap();
let db = UtxoDatabase::open(tmp.path(), 0).unwrap();
let tree = Arc::new(MockTree::new());
let txid = [1u8; 32];
let block = make_block(vec![make_coinbase(5_000_000_000)]);
let _pin = db.append(&block, &[txid], 100).unwrap();
let muhash = watermark_export(&db, tree.as_ref(), 100).unwrap();
let op = OutPoint {
hash: txid,
index: 0,
};
let key = outpoint_to_key(&op);
let val = tree.get_value(&key);
assert!(
val.is_some(),
"coinbase UTXO should have been written to tree"
);
let empty = MuHash3072::new();
let exported = muhash.serialize_running_state();
let empty_state = empty.serialize_running_state();
assert_ne!(
exported, empty_state,
"MuHash should have at least one entry"
);
}
}