use crate::bitcoin::consensus::serialize;
use crate::bitcoin::{OutPoint, TxOut};
use crate::utxo::UtxoStore;
use crate::BlockExtra;
use bitcoin_slices::redb::{self, Database, ReadableTable, TableDefinition};
use bitcoin_slices::{bsl, Parse};
use log::{debug, info};
use std::collections::HashMap;
use std::path::Path;
pub struct RedbUtxo {
db: Database,
updated_up_to_height: i32,
inserted_outputs: u64,
}
const UTXOS_TABLE: TableDefinition<bsl::OutPoint, bsl::TxOut> = TableDefinition::new("utxos");
const PREVOUTS_TABLE: TableDefinition<i32, bsl::TxOuts> = TableDefinition::new("prevouts");
const INTS_TABLE: TableDefinition<&str, i32> = TableDefinition::new("ints");
impl RedbUtxo {
pub fn new<P: AsRef<Path>>(path: P) -> Result<RedbUtxo, redb::Error> {
let db = Database::create(path)?;
let tables: Vec<_> = {
let read_txn = db.begin_read()?;
read_txn.list_tables()?.collect()
};
if tables.len() != 3 {
let write_txn = db.begin_write()?;
write_txn.open_table(UTXOS_TABLE)?;
write_txn.open_table(PREVOUTS_TABLE)?;
write_txn.open_table(INTS_TABLE)?;
write_txn.commit()?;
}
let updated_up_to_height = {
let read_txn = db.begin_read()?;
let table = read_txn.open_table(INTS_TABLE)?;
let result = table.get("height")?;
result.map(|a| a.value()).unwrap_or(-1)
};
info!("DB updated_height: {}", updated_up_to_height);
Ok(RedbUtxo {
db,
updated_up_to_height,
inserted_outputs: 0,
})
}
}
impl UtxoStore for RedbUtxo {
fn add_outputs_get_inputs(&mut self, block_extra: &BlockExtra, height: u32) -> Vec<TxOut> {
let block = block_extra.block();
let height = height as i32;
debug!(
"height: {} updated_up_to: {}",
height, self.updated_up_to_height
);
if height > self.updated_up_to_height {
let total_outputs = block.txdata.iter().map(|e| e.output.len()).sum();
let mut block_outputs = HashMap::with_capacity(total_outputs);
for (txid, tx) in block_extra.iter_tx() {
for (i, output) in tx.output.iter().enumerate() {
if !output.script_pubkey.is_op_return() {
let outpoint = OutPoint::new(*txid, i as u32);
block_outputs.insert(outpoint, output.clone());
}
}
}
let total_inputs = block.txdata.iter().skip(1).map(|e| e.input.len()).sum();
let mut prevouts = Vec::with_capacity(total_inputs);
let mut to_delete = Vec::with_capacity(total_outputs);
{
let read_txn = self.db.begin_read().unwrap();
let utxos_table = read_txn.open_table(UTXOS_TABLE).unwrap();
for tx in block.txdata.iter().skip(1) {
for input in tx.input.iter() {
match block_outputs.remove(&input.previous_output) {
Some(tx_out) => {
prevouts.push(tx_out.clone())
}
None => {
let outpoint_bytes = serialize(&input.previous_output);
let out_point = bsl::OutPoint::parse(&outpoint_bytes)
.unwrap()
.parsed_owned();
let tx_out_slice = utxos_table.get(&out_point).unwrap().unwrap();
let tx_out = tx_out_slice.value().into();
to_delete.push(outpoint_bytes);
prevouts.push(tx_out);
}
}
}
}
}
let mut write_txn = self.db.begin_write().unwrap();
if height % 10 != 0 {
write_txn.set_durability(redb::Durability::None);
}
{
let mut utxos_table = write_txn.open_table(UTXOS_TABLE).unwrap();
for el in to_delete {
let out_point = bsl::OutPoint::parse(&el).unwrap().parsed_owned();
utxos_table.remove(out_point).unwrap();
}
for (k, v) in block_outputs.drain() {
let tx_out_bytes = serialize(&v);
let tx_out = bsl::TxOut::parse(&tx_out_bytes).unwrap().parsed_owned();
let out_point_bytes = serialize(&k);
let out_point = bsl::OutPoint::parse(&out_point_bytes)
.unwrap()
.parsed_owned();
utxos_table.insert(out_point, tx_out).unwrap();
self.inserted_outputs += 1;
}
if !prevouts.is_empty() {
let mut prevouts_table = write_txn.open_table(PREVOUTS_TABLE).unwrap();
let tx_outs_bytes = serialize(&prevouts);
let tx_outs = bsl::TxOuts::parse(&tx_outs_bytes).unwrap().parsed_owned();
prevouts_table.insert(height, tx_outs).unwrap();
}
let mut prevouts_table = write_txn.open_table(INTS_TABLE).unwrap();
prevouts_table.insert("height", height).unwrap();
}
write_txn.commit().unwrap();
prevouts
} else if block.txdata.len() == 1 {
Vec::new()
} else {
let read_txn = self.db.begin_read().unwrap();
let prevouts_table = read_txn.open_table(PREVOUTS_TABLE).unwrap();
let tx_outs = prevouts_table.get(height).unwrap().unwrap();
tx_outs.value().iter().map(|e| e.into()).collect()
}
}
fn stat(&self) -> String {
format!(
"updated_up_to_height: {} inserted_outputs: {}",
self.updated_up_to_height, self.inserted_outputs
)
}
}
#[cfg(test)]
mod test {
use crate::{inner_test::test_conf, iter};
use test_log::test;
#[test]
fn test_blk_testnet_redb() {
let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let conf = {
let mut conf = test_conf();
conf.utxo_redb = Some(path.to_path_buf());
conf
};
let mut max_height = 0;
for b in iter(conf.clone()) {
max_height = max_height.max(b.height);
if b.height == 389 {
assert_eq!(b.fee(), Some(50_000));
assert_eq!(b.iter_tx().size_hint(), (2, Some(2)));
}
assert!(b.iter_tx().next().is_some());
for (txid, tx) in b.iter_tx() {
assert_eq!(*txid, tx.compute_txid());
}
}
assert_eq!(max_height, 400 - conf.max_reorg as u32);
for b in iter(conf) {
if b.height == 394 {
assert_eq!(b.fee(), Some(50_000));
}
}
}
}