use anyhow::Result;
use std::{
cmp::Ordering,
collections::{HashMap, HashSet},
};
use bitcoin_hashes::{hex::ToHex, Hash};
use bitcoincash::Txid;
use rayon::prelude::*;
use sha1::Digest;
use sha2::Sha256;
use crate::{
chaindef::{OutPointHash, ScriptHash, TokenID},
indexes::{
outputindex::OutputIndexRow,
scripthashindex::{OutputFlags, ScriptHashIndexRow},
DBRow,
},
mempool::{ConfirmationState, Tracker},
query::queryutil::token_from_outpoint,
store::{DBContents, DBStore},
timeout::{
par_timeout_collect, par_timeout_collect_boxed, par_timeout_collect_vec, TimeoutTrigger,
},
};
use super::queryutil::{get_output_rows, height_by_txid, output_is_spent, tx_spending_outpoint};
#[derive(Serialize)]
pub struct HistoryItem {
pub tx_hash: Txid,
pub height: i32,
#[serde(skip_serializing_if = "Option::is_none")]
pub fee: Option<u64>, }
#[derive(Serialize)]
pub struct UnspentItem {
#[serde(rename = "tx_hash")]
pub txid: Txid,
#[serde(rename = "tx_pos")]
pub vout: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub outpoint_hash: Option<OutPointHash>,
pub height: u32,
pub value: u64,
#[cfg(feature = "nexa")]
#[serde(skip_serializing_if = "Option::is_none", rename = "token_id_hex")]
pub token_id: Option<TokenID>,
#[cfg(not(feature = "nexa"))]
#[serde(skip_serializing_if = "Option::is_none")]
pub token_id: Option<TokenID>,
#[serde(skip_serializing_if = "Option::is_none")]
pub token_amount: Option<i64>,
}
pub(crate) fn by_block_height(a: &HistoryItem, b: &HistoryItem) -> Ordering {
if a.height == b.height {
return b.tx_hash.cmp(&a.tx_hash);
}
if a.height > 0 && b.height > 0 {
return a.height.cmp(&b.height);
}
let mut a_height = a.height;
let mut b_height = b.height;
if a_height <= 0 {
a_height = 0xEE_EEEE + a_height.abs();
}
if b_height <= 0 {
b_height = 0xEE_EEEE + b_height.abs();
}
a_height.cmp(&b_height)
}
fn scan_for_outputs(
store: &DBStore,
scripthash: ScriptHash,
output_flags: OutputFlags,
filter_token: Option<TokenID>,
) -> Box<impl '_ + ParallelIterator<Item = OutputIndexRow>> {
let scripthash_filter = match output_flags {
OutputFlags::None => ScriptHashIndexRow::filter_by_scripthash(scripthash.into_inner()),
OutputFlags::HasTokens => {
ScriptHashIndexRow::filter_by_scripthash_with_token(scripthash.into_inner())
}
};
let scripthashes = store
.scan(ScriptHashIndexRow::CF, scripthash_filter)
.map(|r| ScriptHashIndexRow::from_row(&r))
.map(|r| r.outpointhash())
.collect::<Vec<OutPointHash>>();
Box::new(
scripthashes
.into_par_iter()
.flat_map(move |outpointhash| {
get_output_rows(store, &outpointhash)
.collect::<Vec<OutputIndexRow>>()
.into_par_iter()
})
.filter(move |o| {
if let Some(filter) = &filter_token {
debug_assert!(output_flags == OutputFlags::HasTokens);
match token_from_outpoint(store, &o.hash()) {
Some((t, _)) => &t == filter,
None => false,
}
} else {
true
}
}),
)
}
pub fn confirmed_scripthash_balance(
index: &DBStore,
scripthash: ScriptHash,
timeout: &TimeoutTrigger,
) -> Result<(i64, Vec<(i64, OutPointHash)>)> {
assert!(index.contents == DBContents::ConfirmedIndex);
let outputs = par_timeout_collect_vec::<(i64, OutPointHash)>(
timeout,
scan_for_outputs(index, scripthash, OutputFlags::None, None)
.map(|o| (o.value(), o.take_hash()))
.map(|(value, outpoint)| {
if output_is_spent(index, outpoint) {
(0, outpoint)
} else {
(value as i64, outpoint)
}
}),
)?;
let amount = outputs.par_iter().map(|(value, _)| value).sum();
let outputs = outputs
.into_iter()
.filter(|(amount, _)| amount > &0)
.collect();
Ok((amount, outputs))
}
pub fn unconfirmed_scripthash_balance(
mempool: &DBStore,
confirmed_outputs: Vec<(i64, OutPointHash)>,
scripthash: ScriptHash,
timeout: &TimeoutTrigger,
) -> Result<i64> {
assert!(DBContents::MempoolIndex == mempool.contents);
let unconfirmed_outputs = par_timeout_collect_vec::<i64>(
timeout,
scan_for_outputs(mempool, scripthash, OutputFlags::None, None)
.map(|o| (o.value(), o.take_hash()))
.map(|(value, outpoint)| {
if output_is_spent(mempool, outpoint) {
0
} else {
value as i64
}
}),
)?;
let amount: i64 = unconfirmed_outputs.par_iter().sum();
let spent_confirmed: i64 = confirmed_outputs
.into_par_iter()
.map(|(value, out)| {
assert!(value >= 0);
if output_is_spent(mempool, out) {
value
} else {
0
}
})
.sum::<i64>();
Ok(amount - spent_confirmed)
}
fn merge_token_balance_maps(
mut a: HashMap<TokenID, i64>,
b: HashMap<TokenID, i64>,
) -> HashMap<TokenID, i64> {
for (token_id, amount) in b {
a.entry(token_id)
.and_modify(|a| {
*a += amount;
})
.or_insert(amount);
}
a
}
fn calc_token_balance(
store: &DBStore,
scripthash: ScriptHash,
timeout: &TimeoutTrigger,
) -> Result<(HashMap<TokenID, i64>, Vec<OutPointHash>)> {
let outputs = par_timeout_collect_vec::<OutPointHash>(
timeout,
scan_for_outputs(store, scripthash, OutputFlags::HasTokens, None).map(|o| o.take_hash()),
)?;
#[allow(clippy::redundant_closure)]
let balance: HashMap<TokenID, i64> = outputs
.par_iter()
.map(|outpoint| {
let (token_id, amount) = token_from_outpoint(store, outpoint)
.expect("token info found for outpoint {outpoint}");
if output_is_spent(store, *outpoint) {
(token_id, 0)
} else {
(token_id, amount)
}
})
.fold(
|| HashMap::<TokenID, i64>::default(),
|mut map, (token_id, amount)| {
map.entry(token_id)
.and_modify(|a| {
*a += amount;
})
.or_insert(amount);
map
},
)
.reduce(|| HashMap::default(), merge_token_balance_maps);
Ok((balance, outputs))
}
pub(crate) fn unconfirmed_scripthash_token_balance(
mempool: &DBStore,
index: &DBStore,
confirmed_outputs: Vec<OutPointHash>,
scripthash: ScriptHash,
timeout: &TimeoutTrigger,
) -> Result<HashMap<TokenID, i64>> {
assert!(mempool.contents == DBContents::MempoolIndex);
assert!(index.contents == DBContents::ConfirmedIndex);
let (balance, _) = calc_token_balance(mempool, scripthash, timeout)?;
#[allow(clippy::redundant_closure)]
let spends_of_confirmed = confirmed_outputs
.into_par_iter()
.map(|outpoint| {
if output_is_spent(mempool, outpoint) {
let (token_id, amount) = token_from_outpoint(index, &outpoint)
.expect("token info found for outpoint {outpoint}");
(token_id, -amount)
} else {
(TokenID::all_zeros(), 0)
}
})
.fold(
|| HashMap::<TokenID, i64>::default(),
|mut map, (token_id, amount)| {
map.entry(token_id)
.and_modify(|a| {
*a += amount;
})
.or_insert(amount);
map
},
)
.reduce(|| HashMap::default(), merge_token_balance_maps);
let mut balance = merge_token_balance_maps(balance, spends_of_confirmed);
balance.remove(&TokenID::all_zeros());
Ok(balance)
}
pub(crate) fn confirmed_scripthash_token_balance(
store: &DBStore,
scripthash: ScriptHash,
timeout: &TimeoutTrigger,
) -> Result<(HashMap<TokenID, i64>, Vec<OutPointHash>)> {
calc_token_balance(store, scripthash, timeout)
}
pub(crate) fn scripthash_transactions(
store: &DBStore,
scripthash: ScriptHash,
output_flags: OutputFlags,
filter_token: Option<TokenID>,
additional_outputs: Vec<OutPointHash>,
timeout: &TimeoutTrigger,
) -> Result<(HashSet<Txid>, Vec<OutPointHash>)> {
let outputs = par_timeout_collect_boxed::<OutputIndexRow>(
timeout,
scan_for_outputs(store, scripthash, output_flags, filter_token),
)?;
let (mut funder_txid, outpoints): (HashSet<Txid>, Vec<OutPointHash>) =
outputs.into_iter().map(|o| (o.txid(), o.hash())).unzip();
let spender_txids: HashSet<Txid> = par_timeout_collect::<Txid, HashSet<Txid>>(
timeout,
outpoints
.par_iter()
.chain(additional_outputs.par_iter())
.filter_map(|o| tx_spending_outpoint(store, o).map(|input| input.txid())),
)?;
funder_txid.extend(spender_txids.into_iter());
Ok((funder_txid, outpoints))
}
pub(crate) fn get_confirmed_outputs(
index: &DBStore,
scripthash: ScriptHash,
output_flags: OutputFlags,
filter_token: Option<TokenID>,
timeout: &TimeoutTrigger,
) -> Result<Vec<OutPointHash>> {
assert!(index.contents == DBContents::ConfirmedIndex);
par_timeout_collect::<OutPointHash, _>(
timeout,
scan_for_outputs(index, scripthash, output_flags, filter_token).map(|o| o.hash()),
)
}
fn confirmed_history(
store: &DBStore,
scripthash: ScriptHash,
output_flags: OutputFlags,
filter_token: Option<TokenID>,
timeout: &TimeoutTrigger,
) -> Result<(Vec<HistoryItem>, Vec<OutPointHash>)> {
let (txids, outputs) = scripthash_transactions(
store,
scripthash,
output_flags,
filter_token,
vec![],
timeout,
)?;
let history = txids
.into_par_iter()
.map(|txid| {
let height = match height_by_txid(store, txid) {
Some(h) => h,
None => {
debug_assert!(false, "Confirmed tx cannot be missing height");
0
}
};
HistoryItem {
tx_hash: txid,
height: height as i32,
fee: None,
}
})
.collect();
Ok((history, outputs))
}
pub fn unconfirmed_history(
mempool: &Tracker,
confirmed_outputs: Vec<OutPointHash>,
scripthash: ScriptHash,
output_flags: OutputFlags,
filter_token: Option<TokenID>,
timeout: &TimeoutTrigger,
) -> Result<Vec<HistoryItem>> {
let (txids, _) = scripthash_transactions(
mempool.index(),
scripthash,
output_flags,
filter_token,
confirmed_outputs,
timeout,
)?;
Ok(txids
.into_par_iter()
.map(|txid| {
let height = match mempool.tx_confirmation_state(&txid, None) {
ConfirmationState::InMempool => 0,
ConfirmationState::UnconfirmedParent => -1,
ConfirmationState::Indeterminate | ConfirmationState::Confirmed => {
debug_assert!(
false,
"Mempool tx's state cannot be indeterminate or confirmed"
);
0
}
};
HistoryItem {
tx_hash: txid,
height,
fee: mempool.get_fee(&txid),
}
})
.collect())
}
pub fn scripthash_history(
store: &DBStore,
mempool: &Tracker,
scripthash: ScriptHash,
output_flags: OutputFlags,
filter_token: Option<TokenID>,
timeout: &TimeoutTrigger,
) -> Result<Vec<HistoryItem>> {
let (mut history, confirmed_outputs) = confirmed_history(
store,
scripthash,
output_flags,
filter_token.clone(),
timeout,
)?;
history.extend(
unconfirmed_history(
mempool,
confirmed_outputs,
scripthash,
output_flags,
filter_token,
timeout,
)?
.into_iter(),
);
history.par_sort_unstable_by(by_block_height);
Ok(history)
}
pub fn hash_scripthash_history(history: &Vec<HistoryItem>) -> Option<[u8; 32]> {
if history.is_empty() {
None
} else {
let mut sha2 = Sha256::new();
let parts: Vec<String> = history
.into_par_iter()
.map(|t| format!("{}:{}:", t.tx_hash.to_hex(), t.height))
.collect();
for p in parts {
sha2.update(p.as_bytes());
}
Some(sha2.finalize().into())
}
}
pub(crate) fn scripthash_listunspent(
store: &DBStore,
mempool: &DBStore,
scripthash: ScriptHash,
output_flags: OutputFlags,
filter_token: Option<TokenID>,
timeout: &TimeoutTrigger,
) -> Result<Vec<UnspentItem>> {
assert!(store.contents == DBContents::ConfirmedIndex);
assert!(mempool.contents == DBContents::MempoolIndex);
let confirmed = scan_for_outputs(store, scripthash, output_flags, filter_token.clone())
.filter(|out| !output_is_spent(store, out.hash()))
.map(|out| {
let txid = out.txid();
(out, height_by_txid(store, txid).expect("height missing"))
});
let unconfirmed = scan_for_outputs(mempool, scripthash, output_flags, filter_token.clone())
.filter(|out| !output_is_spent(mempool, out.hash()))
.map(|out| {
(out, 0 )
});
let unspent_outputs = confirmed.chain(unconfirmed);
let unspent_outputs = unspent_outputs.filter_map(|(output, height)| {
if output_flags == OutputFlags::HasTokens {
let (token_id, amount) =
token_from_outpoint(store, &output.hash()).unwrap_or_else(|| {
token_from_outpoint(mempool, &output.hash()).expect("missing token info")
});
if let Some(filter) = &filter_token {
if filter == &token_id {
Some((output, height, Some(token_id), Some(amount)))
} else {
None
}
} else {
Some((output, height, Some(token_id), Some(amount)))
}
} else {
Some((output, height, None, None))
}
});
let unspent_outputs = unspent_outputs.map(|(out, height, token_id, token_amount)| {
#[cfg(feature = "nexa")]
{
UnspentItem {
txid: out.txid(),
vout: out.index(),
outpoint_hash: Some(out.hash()),
height,
value: out.value(),
token_id,
token_amount,
}
}
#[cfg(not(feature = "nexa"))]
{
UnspentItem {
txid: out.txid(),
vout: out.index(),
outpoint_hash: None,
height,
value: out.value(),
token_id,
token_amount,
}
}
});
let mut unspent = par_timeout_collect_vec::<UnspentItem>(timeout, unspent_outputs)?;
unspent.par_sort_by_key(|u| u.height);
Ok(unspent)
}