use crate::storage::database::Tree;
use anyhow::Result;
pub(crate) const MAX_BATCH_OPS: usize = 200_000;
const EVICT_MIN_AGE_BLOCKS: u64 = 100;
const EVICT_VERY_OLD_BLOCKS: u64 = 10_000;
#[allow(dead_code)]
const EVICT_DUST_THRESHOLD: i64 = 546;
use blvm_protocol::transaction::is_coinbase;
use blvm_protocol::types::{Block, Hash, OutPoint, UTXO};
use rustc_hash::{FxHashMap, FxHashSet};
use std::sync::Arc;
use tracing::debug;
pub type OutPointKey = [u8; 40];
type PendingValue = Option<Arc<UTXO>>;
#[inline]
pub fn outpoint_to_key(outpoint: &OutPoint) -> OutPointKey {
let mut key = [0u8; 40];
key[..32].copy_from_slice(&outpoint.hash);
key[32..40].copy_from_slice(&(outpoint.index as u64).to_be_bytes());
key
}
#[inline]
pub fn key_to_outpoint(key: &OutPointKey) -> OutPoint {
let mut hash = [0u8; 32];
hash.copy_from_slice(&key[..32]);
let index = u64::from_be_bytes(key[32..40].try_into().unwrap()) as u32;
OutPoint { hash, index }
}
pub(crate) fn load_keys_from_disk(
disk: Arc<dyn Tree>,
mut keys: Vec<OutPointKey>,
codec: crate::storage::utxo_value_codec::ValueCodec,
) -> Result<(FxHashMap<OutPointKey, UTXO>, Vec<OutPointKey>)> {
if keys.is_empty() {
return Ok((FxHashMap::default(), Vec::new()));
}
keys.sort_unstable();
let mut key_refs: Vec<&[u8]> = Vec::with_capacity(keys.len());
for k in &keys {
key_refs.push(k.as_slice());
}
#[cfg(feature = "heed3")]
if codec == crate::storage::utxo_value_codec::ValueCodec::Rkyv {
if let Some(heed3_tree) = disk.as_heed3_tree() {
let rtxn = heed3_tree.env().read_txn()?;
let slices = heed3_tree.get_many_heed3(&key_refs, &rtxn)?;
let mut result = FxHashMap::with_capacity_and_hasher(keys.len(), Default::default());
for (key, opt_bytes) in keys.iter().zip(slices) {
if let Some(bytes) = opt_bytes {
if let Ok(archived) = crate::storage::rkyv_codec::access_utxo(bytes) {
result.insert(
*key,
crate::storage::rkyv_codec::utxo_from_archived(archived),
);
}
}
}
return Ok((result, keys));
}
}
let values = disk.get_many_no_cache(&key_refs)?;
let mut result = FxHashMap::with_capacity_and_hasher(keys.len(), Default::default());
for (key, value) in keys.iter().zip(values.into_iter()) {
if let Some(data) = value {
if let Ok(utxo) = crate::storage::utxo_value_codec::decode_utxo_with_codec(codec, &data)
{
result.insert(*key, utxo);
}
}
}
Ok((result, keys))
}
#[inline]
pub fn block_input_keys_into(block: &Block, keys_out: &mut Vec<OutPointKey>) {
let est: usize = block
.transactions
.iter()
.filter(|tx| !is_coinbase(tx))
.map(|tx| tx.inputs.len())
.sum();
keys_out.clear();
keys_out.reserve(est);
for tx in block.transactions.iter() {
if is_coinbase(tx) {
continue;
}
for input in tx.inputs.iter() {
keys_out.push(outpoint_to_key(&input.prevout));
}
}
}
pub(crate) fn block_input_keys_batch(blocks: &[&Block]) -> Vec<OutPointKey> {
let est: usize = blocks
.iter()
.map(|b| {
b.transactions
.iter()
.filter(|tx| !is_coinbase(tx))
.map(|tx| tx.inputs.len())
.sum::<usize>()
})
.sum();
let mut seen = FxHashSet::with_capacity_and_hasher(est, Default::default());
let mut keys = Vec::with_capacity(est);
for block in blocks {
for tx in block.transactions.iter() {
if is_coinbase(tx) {
continue;
}
for input in tx.inputs.iter() {
let key = outpoint_to_key(&input.prevout);
if seen.insert(key) {
keys.push(key);
}
}
}
}
keys
}
pub(crate) fn block_input_keys_batch_into(
blocks: &[&Block],
keys_out: &mut Vec<OutPointKey>,
seen: &mut FxHashSet<OutPointKey>,
) {
let est: usize = blocks
.iter()
.map(|b| {
b.transactions
.iter()
.filter(|tx| !is_coinbase(tx))
.map(|tx| tx.inputs.len())
.sum::<usize>()
})
.sum();
keys_out.clear();
keys_out.reserve(est);
seen.clear();
for block in blocks {
for tx in block.transactions.iter() {
if is_coinbase(tx) {
continue;
}
for input in tx.inputs.iter() {
let key = outpoint_to_key(&input.prevout);
if seen.insert(key) {
keys_out.push(key);
}
}
}
}
}
pub(crate) fn block_input_keys_batch_into_arc(
blocks: &[Arc<Block>],
keys_out: &mut Vec<OutPointKey>,
seen: &mut FxHashSet<OutPointKey>,
) {
let est: usize = blocks
.iter()
.map(|b| {
b.transactions
.iter()
.filter(|tx| !is_coinbase(tx))
.map(|tx| tx.inputs.len())
.sum::<usize>()
})
.sum();
keys_out.clear();
keys_out.reserve(est);
seen.clear();
for block in blocks {
for tx in block.transactions.iter() {
if is_coinbase(tx) {
continue;
}
for input in tx.inputs.iter() {
let key = outpoint_to_key(&input.prevout);
if seen.insert(key) {
keys_out.push(key);
}
}
}
}
}
pub fn block_input_keys_into_filtered_with_tx_ids(
block: &Block,
tx_ids: &[Hash],
keys_out: &mut Vec<OutPointKey>,
) -> usize {
let est: usize = block
.transactions
.iter()
.filter(|tx| !is_coinbase(tx))
.map(|tx| tx.inputs.len())
.sum();
keys_out.clear();
keys_out.reserve(est);
let mut filtered = 0usize;
for (spending_idx, tx) in block.transactions.iter().enumerate() {
if is_coinbase(tx) {
continue;
}
for input in tx.inputs.iter() {
let h = input.prevout.hash;
let funded_by_prior_non_cb = (1..spending_idx).any(|j| tx_ids[j] == h);
if funded_by_prior_non_cb {
filtered += 1;
} else {
keys_out.push(outpoint_to_key(&input.prevout));
}
}
}
filtered
}
pub fn block_input_keys_and_tx_ids_filtered(
block: &Block,
tx_ids_buf: &mut Vec<Hash>,
keys_out: &mut Vec<OutPointKey>,
) -> usize {
use blvm_protocol::block::compute_block_tx_ids_into;
compute_block_tx_ids_into(block, tx_ids_buf);
block_input_keys_into_filtered_with_tx_ids(block, tx_ids_buf, keys_out)
}
pub fn compute_tx_ids_only(block: &Block, tx_ids_buf: &mut Vec<Hash>) {
use blvm_protocol::block::compute_block_tx_ids_into;
compute_block_tx_ids_into(block, tx_ids_buf);
}
pub fn block_input_keys_into_filtered(block: &Block, keys_out: &mut Vec<OutPointKey>) -> usize {
use blvm_protocol::block::compute_block_tx_ids;
let tx_ids = compute_block_tx_ids(block);
block_input_keys_into_filtered_with_tx_ids(block, &tx_ids, keys_out)
}
pub struct SyncBatch {
pub deletes: Vec<OutPointKey>,
pub inserts: Vec<(OutPointKey, Arc<UTXO>)>,
pub total_delta: isize,
}
pub fn flush_batch_to_disk(
batch: &[(OutPointKey, PendingValue)],
disk: &dyn Tree,
codec: crate::storage::utxo_value_codec::ValueCodec,
) -> Result<usize> {
if batch.is_empty() {
return Ok(0);
}
let mut total_flushed = 0;
for chunk in batch.chunks(MAX_BATCH_OPS) {
let mut b = disk.batch()?;
for (key, value_opt) in chunk {
match value_opt {
Some(arc) => {
let ser_buf = crate::storage::utxo_value_codec::encode_utxo_with_codec(
codec,
arc.as_ref(),
)?;
b.put(key.as_slice(), ser_buf.as_slice());
}
None => b.delete(key.as_slice()),
}
}
b.commit()?;
total_flushed += chunk.len();
}
debug!(
"flush_batch_to_disk: flushed {} operations to disk",
total_flushed
);
Ok(total_flushed)
}
#[cfg(all(test, feature = "heed3"))]
mod heed3_load_tests {
use super::*;
use crate::storage::database::{create_database, Database, DatabaseBackend, Tree};
use crate::storage::rkyv_codec::{access_utxo, utxo_from_archived};
use crate::storage::utxo_value_codec::{encode_utxo_with_codec, ValueCodec};
use blvm_protocol::types::{OutPoint, UTXO};
use std::sync::Arc;
use tempfile::TempDir;
#[test]
fn load_keys_from_disk_heed3_zero_copy_matches_owned_path() {
let temp_dir = TempDir::new().unwrap();
let db: Arc<dyn Database> =
Arc::from(create_database(temp_dir.path(), DatabaseBackend::Heed3, None).unwrap());
let tree: Arc<dyn Tree> = Arc::from(db.open_tree("ibd_utxos").unwrap());
let mut keys = Vec::new();
for i in 0..64u64 {
let op = OutPoint {
hash: [i as u8; 32],
index: 1,
};
let key = outpoint_to_key(&op);
let utxo = UTXO {
value: (i as i64) * 500,
script_pubkey: vec![0x51, (i & 0xff) as u8].into(),
height: i,
is_coinbase: false,
};
tree.insert(
&key,
&encode_utxo_with_codec(ValueCodec::Rkyv, &utxo).unwrap(),
)
.unwrap();
keys.push(key);
}
let (zc_map, _) =
load_keys_from_disk(Arc::clone(&tree), keys.clone(), ValueCodec::Rkyv).unwrap();
assert_eq!(zc_map.len(), 64);
let owned = tree
.get_many_no_cache(&keys.iter().map(|k| k.as_slice()).collect::<Vec<_>>())
.unwrap();
for (key, opt) in keys.iter().zip(owned) {
let zc = zc_map.get(key).expect("zero-copy map missing key");
let data = opt.expect("owned get missing key");
let archived = access_utxo(&data).unwrap();
let from_owned = utxo_from_archived(archived);
assert_eq!(*zc, from_owned);
}
}
}