rostrum 8.0.0

An efficient implementation of Electrum Server with token support
Documentation
use anyhow::Result;
use std::{
    cmp::Ordering,
    collections::{HashMap, HashSet},
};

use bitcoin_hashes::{hex::ToHex, Hash};
use bitcoincash::Txid;
use rayon::prelude::*;
use sha1::Digest;
use sha2::Sha256;

use crate::{
    chaindef::{OutPointHash, ScriptHash, TokenID},
    indexes::{
        outputindex::OutputIndexRow,
        scripthashindex::{OutputFlags, ScriptHashIndexRow},
        DBRow,
    },
    mempool::{ConfirmationState, Tracker},
    query::queryutil::token_from_outpoint,
    store::{DBContents, DBStore},
    timeout::{
        par_timeout_collect, par_timeout_collect_boxed, par_timeout_collect_vec, TimeoutTrigger,
    },
};

use super::queryutil::{get_output_rows, height_by_txid, output_is_spent, tx_spending_outpoint};

#[derive(Serialize)]
pub struct HistoryItem {
    pub tx_hash: Txid,
    pub height: i32,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub fee: Option<u64>, // need to be set only for unconfirmed transactions (i.e. height <= 0)
}

#[derive(Serialize)]
pub struct UnspentItem {
    #[serde(rename = "tx_hash")]
    pub txid: Txid,
    #[serde(rename = "tx_pos")]
    pub vout: u32,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub outpoint_hash: Option<OutPointHash>,
    pub height: u32,
    pub value: u64,

    #[cfg(feature = "nexa")]
    #[serde(skip_serializing_if = "Option::is_none", rename = "token_id_hex")]
    pub token_id: Option<TokenID>,

    #[cfg(not(feature = "nexa"))]
    #[serde(skip_serializing_if = "Option::is_none")]
    pub token_id: Option<TokenID>,

    #[serde(skip_serializing_if = "Option::is_none")]
    pub token_amount: Option<i64>,
}

/**
 * For sorting history items by confirmation height (and then ID)
 */
pub(crate) fn by_block_height(a: &HistoryItem, b: &HistoryItem) -> Ordering {
    if a.height == b.height {
        // Order by little endian tx hash if height is the same,
        // in most cases, this order is the same as on the blockchain.
        return b.tx_hash.cmp(&a.tx_hash);
    }
    if a.height > 0 && b.height > 0 {
        return a.height.cmp(&b.height);
    }

    // mempool txs should be sorted last, so add to it a large number
    // per spec, mempool entries do not need to be sorted, but we do it
    // anyway so that statushash is deterministic
    let mut a_height = a.height;
    let mut b_height = b.height;
    if a_height <= 0 {
        a_height = 0xEE_EEEE + a_height.abs();
    }
    if b_height <= 0 {
        b_height = 0xEE_EEEE + b_height.abs();
    }
    a_height.cmp(&b_height)
}

/**
 * Find output rows given filter parameters.
 *
 * This is an expensive call used by  most blockchain.scripthash.* queries.
 */
fn scan_for_outputs(
    store: &DBStore,
    scripthash: ScriptHash,
    output_flags: OutputFlags,
    filter_token: Option<TokenID>,
) -> Box<impl '_ + ParallelIterator<Item = OutputIndexRow>> {
    let scripthash_filter = match output_flags {
        OutputFlags::None => ScriptHashIndexRow::filter_by_scripthash(scripthash.into_inner()),
        OutputFlags::HasTokens => {
            ScriptHashIndexRow::filter_by_scripthash_with_token(scripthash.into_inner())
        }
    };
    let scripthashes = store
        .scan(ScriptHashIndexRow::CF, scripthash_filter)
        .map(|r| ScriptHashIndexRow::from_row(&r))
        .map(|r| r.outpointhash())
        .collect::<Vec<OutPointHash>>();

    // get_output_rows is an expensive call, so we collect outpoint hashes above
    // so that we can call it in parallel.
    Box::new(
        scripthashes
            .into_par_iter()
            .flat_map(move |outpointhash| {
                get_output_rows(store, &outpointhash)
                    .collect::<Vec<OutputIndexRow>>()
                    .into_par_iter()
            })
            .filter(move |o| {
                if let Some(filter) = &filter_token {
                    debug_assert!(output_flags == OutputFlags::HasTokens);
                    match token_from_outpoint(store, &o.hash()) {
                        Some((t, _)) => &t == filter,
                        None => false,
                    }
                } else {
                    true
                }
            }),
    )
}

/**
 * Calculate coin balance in scripthash that has been confirmed in blocks.
 */
pub fn confirmed_scripthash_balance(
    index: &DBStore,
    scripthash: ScriptHash,
    timeout: &TimeoutTrigger,
) -> Result<(i64, Vec<(i64, OutPointHash)>)> {
    assert!(index.contents == DBContents::ConfirmedIndex);
    let outputs = par_timeout_collect_vec::<(i64, OutPointHash)>(
        timeout,
        scan_for_outputs(index, scripthash, OutputFlags::None, None)
            .map(|o| (o.value(), o.take_hash()))
            .map(|(value, outpoint)| {
                if output_is_spent(index, outpoint) {
                    // Output was created AND spent in the same index. Zero it out.
                    (0, outpoint)
                } else {
                    (value as i64, outpoint)
                }
            }),
    )?;

    let amount = outputs.par_iter().map(|(value, _)| value).sum();

    // Only return unspent confirmed outputs
    let outputs = outputs
        .into_iter()
        .filter(|(amount, _)| amount > &0)
        .collect();

    Ok((amount, outputs))
}

/**
 * Calculate coin balance in scripthash that has been confirmed in blocks.
 *
 * Takes confirmed_outputs in to be able to see if mempool spends confirmed utxos.
 */
pub fn unconfirmed_scripthash_balance(
    mempool: &DBStore,
    confirmed_outputs: Vec<(i64, OutPointHash)>,
    scripthash: ScriptHash,
    timeout: &TimeoutTrigger,
) -> Result<i64> {
    assert!(DBContents::MempoolIndex == mempool.contents);

    let unconfirmed_outputs = par_timeout_collect_vec::<i64>(
        timeout,
        scan_for_outputs(mempool, scripthash, OutputFlags::None, None)
            .map(|o| (o.value(), o.take_hash()))
            .map(|(value, outpoint)| {
                if output_is_spent(mempool, outpoint) {
                    // Output was created AND spent in the same index. Zero it out.
                    0
                } else {
                    value as i64
                }
            }),
    )?;

    let amount: i64 = unconfirmed_outputs.par_iter().sum();

    // Subtract spends from confirmed utxos
    let spent_confirmed: i64 = confirmed_outputs
        .into_par_iter()
        .map(|(value, out)| {
            assert!(value >= 0);
            if output_is_spent(mempool, out) {
                value
            } else {
                0
            }
        })
        .sum::<i64>();

    Ok(amount - spent_confirmed)
}

// Used in .reduce for merging maps
fn merge_token_balance_maps(
    mut a: HashMap<TokenID, i64>,
    b: HashMap<TokenID, i64>,
) -> HashMap<TokenID, i64> {
    for (token_id, amount) in b {
        a.entry(token_id)
            .and_modify(|a| {
                *a += amount;
            })
            .or_insert(amount);
    }
    a
}

/**
 * Calculate token balance in a given store.
 */
fn calc_token_balance(
    store: &DBStore,
    scripthash: ScriptHash,
    timeout: &TimeoutTrigger,
) -> Result<(HashMap<TokenID, i64>, Vec<OutPointHash>)> {
    let outputs = par_timeout_collect_vec::<OutPointHash>(
        timeout,
        scan_for_outputs(store, scripthash, OutputFlags::HasTokens, None).map(|o| o.take_hash()),
    )?;

    #[allow(clippy::redundant_closure)]
    let balance: HashMap<TokenID, i64> = outputs
        .par_iter()
        .map(|outpoint| {
            let (token_id, amount) = token_from_outpoint(store, outpoint)
                .expect("token info found for outpoint {outpoint}");
            if output_is_spent(store, *outpoint) {
                // Output was created AND spent in the same index. Zero it out.
                (token_id, 0)
            } else {
                (token_id, amount)
            }
        })
        .fold(
            || HashMap::<TokenID, i64>::default(),
            |mut map, (token_id, amount)| {
                map.entry(token_id)
                    .and_modify(|a| {
                        *a += amount;
                    })
                    .or_insert(amount);
                map
            },
        )
        .reduce(|| HashMap::default(), merge_token_balance_maps);

    Ok((balance, outputs))
}

/**
 * Get the unconfirmed token balance
 */
pub(crate) fn unconfirmed_scripthash_token_balance(
    mempool: &DBStore,
    index: &DBStore,
    confirmed_outputs: Vec<OutPointHash>,
    scripthash: ScriptHash,
    timeout: &TimeoutTrigger,
) -> Result<HashMap<TokenID, i64>> {
    assert!(mempool.contents == DBContents::MempoolIndex);
    assert!(index.contents == DBContents::ConfirmedIndex);

    // This finds the balance of entries that have been both created and spent
    // in the mempool. It cannot see spends of outputs thave have been confirmed.
    let (balance, _) = calc_token_balance(mempool, scripthash, timeout)?;

    // Find inputs that spends from confirmed outputs as well
    #[allow(clippy::redundant_closure)]
    let spends_of_confirmed = confirmed_outputs
        .into_par_iter()
        .map(|outpoint| {
            if output_is_spent(mempool, outpoint) {
                let (token_id, amount) = token_from_outpoint(index, &outpoint)
                    .expect("token info found for outpoint {outpoint}");
                (token_id, -amount)
            } else {
                // This output was funded in the confirmed index, not mempool.
                // If it's not spent here, then ignore it.
                (TokenID::all_zeros(), 0)
            }
        })
        .fold(
            || HashMap::<TokenID, i64>::default(),
            |mut map, (token_id, amount)| {
                map.entry(token_id)
                    .and_modify(|a| {
                        *a += amount;
                    })
                    .or_insert(amount);
                map
            },
        )
        .reduce(|| HashMap::default(), merge_token_balance_maps);

    // Merge in spends of confirmed outputs
    let mut balance = merge_token_balance_maps(balance, spends_of_confirmed);
    balance.remove(&TokenID::all_zeros());

    Ok(balance)
}

/**
 * Get the confirmed token balance
 */
pub(crate) fn confirmed_scripthash_token_balance(
    store: &DBStore,
    scripthash: ScriptHash,
    timeout: &TimeoutTrigger,
) -> Result<(HashMap<TokenID, i64>, Vec<OutPointHash>)> {
    calc_token_balance(store, scripthash, timeout)
}

/**
 * Find all transactions that spend of fund scripthash.
 *
 * Parameter 'additional_outpoints' is for funding utxos from other indexes.
 * For mempool, this parameter is needed to be able to locate spends of confirmed outputs
 * (as those outputs are not funded in the mempool)
 */
pub(crate) fn scripthash_transactions(
    store: &DBStore,
    scripthash: ScriptHash,
    output_flags: OutputFlags,
    filter_token: Option<TokenID>,
    additional_outputs: Vec<OutPointHash>,
    timeout: &TimeoutTrigger,
) -> Result<(HashSet<Txid>, Vec<OutPointHash>)> {
    let outputs = par_timeout_collect_boxed::<OutputIndexRow>(
        timeout,
        scan_for_outputs(store, scripthash, output_flags, filter_token),
    )?;

    let (mut funder_txid, outpoints): (HashSet<Txid>, Vec<OutPointHash>) =
        outputs.into_iter().map(|o| (o.txid(), o.hash())).unzip();

    let spender_txids: HashSet<Txid> = par_timeout_collect::<Txid, HashSet<Txid>>(
        timeout,
        outpoints
            .par_iter()
            .chain(additional_outputs.par_iter())
            .filter_map(|o| tx_spending_outpoint(store, o).map(|input| input.txid())),
    )?;
    funder_txid.extend(spender_txids.into_iter());
    Ok((funder_txid, outpoints))
}

/**
 * Get outputs that have been confirmed on the blockchain given filter parameters.
 */
pub(crate) fn get_confirmed_outputs(
    index: &DBStore,
    scripthash: ScriptHash,
    output_flags: OutputFlags,
    filter_token: Option<TokenID>,
    timeout: &TimeoutTrigger,
) -> Result<Vec<OutPointHash>> {
    assert!(index.contents == DBContents::ConfirmedIndex);
    par_timeout_collect::<OutPointHash, _>(
        timeout,
        scan_for_outputs(index, scripthash, output_flags, filter_token).map(|o| o.hash()),
    )
}

/**
 * Generate list of HistoryItem for a scripthashes confirmed history.
 *
 * Also returns confirmed outputs that can be used to get unconfirmed spends
 * of confirmed utxos.
 */
fn confirmed_history(
    store: &DBStore,
    scripthash: ScriptHash,
    output_flags: OutputFlags,
    filter_token: Option<TokenID>,
    timeout: &TimeoutTrigger,
) -> Result<(Vec<HistoryItem>, Vec<OutPointHash>)> {
    let (txids, outputs) = scripthash_transactions(
        store,
        scripthash,
        output_flags,
        filter_token,
        vec![],
        timeout,
    )?;

    let history = txids
        .into_par_iter()
        .map(|txid| {
            let height = match height_by_txid(store, txid) {
                Some(h) => h,
                None => {
                    debug_assert!(false, "Confirmed tx cannot be missing height");
                    0
                }
            };
            HistoryItem {
                tx_hash: txid,
                height: height as i32,
                fee: None,
            }
        })
        .collect();
    Ok((history, outputs))
}

/**
 * Generate list of HistoryItem for a scripthashes unconfirmed history
 */
pub fn unconfirmed_history(
    mempool: &Tracker,
    confirmed_outputs: Vec<OutPointHash>,
    scripthash: ScriptHash,
    output_flags: OutputFlags,
    filter_token: Option<TokenID>,
    timeout: &TimeoutTrigger,
) -> Result<Vec<HistoryItem>> {
    let (txids, _) = scripthash_transactions(
        mempool.index(),
        scripthash,
        output_flags,
        filter_token,
        confirmed_outputs,
        timeout,
    )?;

    Ok(txids
        .into_par_iter()
        .map(|txid| {
            let height = match mempool.tx_confirmation_state(&txid, None) {
                ConfirmationState::InMempool => 0,
                ConfirmationState::UnconfirmedParent => -1,
                ConfirmationState::Indeterminate | ConfirmationState::Confirmed => {
                    debug_assert!(
                        false,
                        "Mempool tx's state cannot be indeterminate or confirmed"
                    );
                    0
                }
            };
            HistoryItem {
                tx_hash: txid,
                height,
                fee: mempool.get_fee(&txid),
            }
        })
        .collect())
}

pub fn scripthash_history(
    store: &DBStore,
    mempool: &Tracker,
    scripthash: ScriptHash,
    output_flags: OutputFlags,
    filter_token: Option<TokenID>,
    timeout: &TimeoutTrigger,
) -> Result<Vec<HistoryItem>> {
    let (mut history, confirmed_outputs) = confirmed_history(
        store,
        scripthash,
        output_flags,
        filter_token.clone(),
        timeout,
    )?;

    history.extend(
        unconfirmed_history(
            mempool,
            confirmed_outputs,
            scripthash,
            output_flags,
            filter_token,
            timeout,
        )?
        .into_iter(),
    );

    history.par_sort_unstable_by(by_block_height);

    Ok(history)
}

/**
 * Generate a hash of scripthash history as defined in electrum spec
 */
pub fn hash_scripthash_history(history: &Vec<HistoryItem>) -> Option<[u8; 32]> {
    if history.is_empty() {
        None
    } else {
        let mut sha2 = Sha256::new();
        let parts: Vec<String> = history
            .into_par_iter()
            .map(|t| format!("{}:{}:", t.tx_hash.to_hex(), t.height))
            .collect();

        for p in parts {
            sha2.update(p.as_bytes());
        }
        Some(sha2.finalize().into())
    }
}

pub(crate) fn scripthash_listunspent(
    store: &DBStore,
    mempool: &DBStore,
    scripthash: ScriptHash,
    output_flags: OutputFlags,
    filter_token: Option<TokenID>,
    timeout: &TimeoutTrigger,
) -> Result<Vec<UnspentItem>> {
    assert!(store.contents == DBContents::ConfirmedIndex);
    assert!(mempool.contents == DBContents::MempoolIndex);

    let confirmed = scan_for_outputs(store, scripthash, output_flags, filter_token.clone())
        .filter(|out| !output_is_spent(store, out.hash()))
        .map(|out| {
            let txid = out.txid();

            (out, height_by_txid(store, txid).expect("height missing"))
        });
    let unconfirmed = scan_for_outputs(mempool, scripthash, output_flags, filter_token.clone())
        .filter(|out| !output_is_spent(mempool, out.hash()))
        .map(|out| {
            (out, 0 /* mempool height */)
        });

    let unspent_outputs = confirmed.chain(unconfirmed);

    // Fetch token info
    let unspent_outputs = unspent_outputs.filter_map(|(output, height)| {
        if output_flags == OutputFlags::HasTokens {
            let (token_id, amount) =
                token_from_outpoint(store, &output.hash()).unwrap_or_else(|| {
                    token_from_outpoint(mempool, &output.hash()).expect("missing token info")
                });

            if let Some(filter) = &filter_token {
                if filter == &token_id {
                    Some((output, height, Some(token_id), Some(amount)))
                } else {
                    None
                }
            } else {
                Some((output, height, Some(token_id), Some(amount)))
            }
        } else {
            Some((output, height, None, None))
        }
    });

    // Collect into json-serializable objects
    let unspent_outputs = unspent_outputs.map(|(out, height, token_id, token_amount)| {
        #[cfg(feature = "nexa")]
        {
            // With nexa, we also want outpoint hash
            UnspentItem {
                txid: out.txid(),
                vout: out.index(),
                outpoint_hash: Some(out.hash()),
                height,
                value: out.value(),
                token_id,
                token_amount,
            }
        }
        #[cfg(not(feature = "nexa"))]
        {
            UnspentItem {
                txid: out.txid(),
                vout: out.index(),
                outpoint_hash: None,
                height,
                value: out.value(),
                token_id,
                token_amount,
            }
        }
    });

    let mut unspent = par_timeout_collect_vec::<UnspentItem>(timeout, unspent_outputs)?;

    unspent.par_sort_by_key(|u| u.height);

    Ok(unspent)
}