use std::{
collections::BTreeSet,
time::{Duration, UNIX_EPOCH},
};
use bdk_core::{
bitcoin::{OutPoint, Txid},
CheckPoint, ConfirmationBlockTime, TxUpdate,
};
use electrum_streaming_client::{request, response, ElectrumScriptHash, ElectrumScriptStatus};
use crate::{req::ReqQueuer, Cache};
#[derive(Debug)]
pub enum SpkJobStage {
ProcessingHistory {
status: ElectrumScriptStatus,
},
ProcessingTxsAndAnchors {
txs: Option<TxsJobStage>,
anchors: BTreeSet<(u32, Txid)>,
},
}
impl SpkJobStage {
pub fn done() -> Self {
Self::ProcessingTxsAndAnchors {
txs: None,
anchors: BTreeSet::new(),
}
}
pub fn is_done(&self) -> bool {
matches!(self, SpkJobStage::ProcessingTxsAndAnchors { txs, anchors } if txs.is_none() && anchors.is_empty())
}
}
#[derive(Debug)]
pub enum TxsJobStage {
Txs(BTreeSet<Txid>),
Prevouts(BTreeSet<OutPoint>),
}
impl TxsJobStage {
pub fn from_missing_txs(txids: impl IntoIterator<Item = Txid>) -> Option<Self> {
let txids = txids.into_iter().collect::<BTreeSet<_>>();
if txids.is_empty() {
None
} else {
Some(Self::Txs(txids))
}
}
pub fn from_missing_prev_txs(outpoints: impl IntoIterator<Item = OutPoint>) -> Option<Self> {
let prev_txs = outpoints.into_iter().collect::<BTreeSet<_>>();
if prev_txs.is_empty() {
None
} else {
Some(Self::Prevouts(prev_txs))
}
}
}
#[derive(Debug)]
pub struct SpkJob {
pub start: Duration,
pub spk_hash: ElectrumScriptHash,
pub stage: SpkJobStage,
pub tx_update: TxUpdate<ConfirmationBlockTime>,
}
impl SpkJob {
pub fn new(
cache: &Cache,
spk_hash: ElectrumScriptHash,
spk_status: Option<ElectrumScriptStatus>,
) -> Self {
let start = UNIX_EPOCH.elapsed().expect("must get unix time");
let mut tx_update = TxUpdate::default();
let stage = match spk_status {
Some(status) => SpkJobStage::ProcessingHistory { status },
None => {
if let Some(prev_txids) = cache.spk_txids.get(&spk_hash) {
tx_update
.evicted_ats
.extend(prev_txids.iter().map(|&txid| (txid, start.as_secs())));
}
SpkJobStage::done()
}
};
Self {
start,
spk_hash,
stage,
tx_update,
}
}
pub fn elapsed_seconds(&self) -> String {
let duration = UNIX_EPOCH.elapsed().expect("must get current timestamp") - self.start;
let seconds = duration.as_secs();
let subsec = duration.subsec_millis();
format!("{seconds}s {subsec}ms")
}
pub fn advance(mut self, queuer: &mut ReqQueuer, cache: &Cache, cp: &CheckPoint) -> Self {
let mut made_progress = true;
while made_progress {
(self, made_progress) = self.try_advance_once(queuer, cache, cp.clone());
let stage_str = match &self.stage {
SpkJobStage::ProcessingHistory { status } => format!("ProcessingHistory({status})"),
SpkJobStage::ProcessingTxsAndAnchors { txs, anchors } => {
let inner_str = match txs {
Some(TxsJobStage::Txs(txids)) => format!("txs = {}", txids.len()),
Some(TxsJobStage::Prevouts(ops)) => format!("prevouts = {}", ops.len()),
None => "tx_done".to_string(),
};
format!(
"ProcessingTxsAndAnchors({inner_str}, anchors = {})",
anchors.len()
)
}
};
tracing::trace!(
elapsed_seconds = self.elapsed_seconds(),
spk_hash = self.spk_hash.to_string(),
stage = stage_str,
"Spk job progress"
);
}
self
}
pub fn try_finish(&mut self) -> Option<(ElectrumScriptHash, TxUpdate<ConfirmationBlockTime>)> {
if self.stage.is_done() {
tracing::trace!(
elapsed_seconds = self.elapsed_seconds(),
spk_hash = self.spk_hash.to_string(),
"Spk job not finished"
);
Some((self.spk_hash, core::mem::take(&mut self.tx_update)))
} else {
tracing::info!(
elapsed_seconds = self.elapsed_seconds(),
spk_hash = self.spk_hash.to_string(),
"Spk job finished"
);
None
}
}
fn try_advance_once(
mut self,
queuer: &mut ReqQueuer,
cache: &Cache,
tip: CheckPoint,
) -> (Self, bool) {
match self.stage {
SpkJobStage::ProcessingHistory { status } => match cache.spk_histories.get(&status) {
Some(history) => {
if let Some(prev_txids) = cache.spk_txids.get(&self.spk_hash) {
let these_txids =
history.iter().map(|tx| tx.txid()).collect::<BTreeSet<_>>();
let to_evict = prev_txids
.difference(&these_txids)
.map(|&txid| (txid, self.start.as_secs()));
self.tx_update.evicted_ats.extend(to_evict);
}
for tx in history {
if let response::Tx::Mempool(tx) = tx {
self.tx_update
.seen_ats
.insert((tx.txid, self.start.as_secs()));
}
}
let txs = TxsJobStage::from_missing_txs(history.iter().map(|tx| tx.txid()));
let anchors = history
.iter()
.filter_map(|tx| {
let height = tx.confirmation_height()?.to_consensus_u32();
Some((height, tx.txid()))
})
.collect();
self.stage = SpkJobStage::ProcessingTxsAndAnchors { txs, anchors };
(self, true)
}
None => {
let script_hash = self.spk_hash;
queuer.enqueue(request::GetHistory { script_hash });
(self, false)
}
},
SpkJobStage::ProcessingTxsAndAnchors {
mut txs,
mut anchors,
} => {
let mut made_progress = false;
txs = match txs {
Some(TxsJobStage::Txs(mut missing_txs)) => {
missing_txs.retain(|txid| match cache.txs.get(txid) {
Some(tx) => {
self.tx_update.txs.push(tx.clone());
false
}
None => {
let txid = *txid;
queuer.enqueue(request::GetTx { txid });
true
}
});
if missing_txs.is_empty() {
made_progress = true;
TxsJobStage::from_missing_prev_txs(
self.tx_update
.txs
.iter()
.filter(|tx| !tx.is_coinbase())
.flat_map(|tx| tx.input.iter())
.map(|txin| txin.previous_output),
)
} else {
Some(TxsJobStage::Txs(missing_txs))
}
}
Some(TxsJobStage::Prevouts(mut missing_prevouts)) => {
missing_prevouts.retain(|op| match cache.txs.get(&op.txid) {
Some(tx) => {
let txout = match tx.output.get(op.vout as usize) {
Some(txout) => txout,
None => {
debug_assert!(false, "Output must exist in tx");
unimplemented!("Handle this error");
}
};
self.tx_update.txouts.insert(*op, txout.clone());
false
}
None => {
let txid = op.txid;
queuer.enqueue(request::GetTx { txid });
true
}
});
if missing_prevouts.is_empty() {
made_progress = true;
None
} else {
Some(TxsJobStage::Prevouts(missing_prevouts))
}
}
None => None,
};
let anchors_start_count = anchors.len();
anchors.retain(|&(height, txid)| {
if height > tip.height() {
return false;
}
let blockhash = match tip.get(height) {
Some(cp) if cp.height() == height => cp.hash(),
_ => {
queuer.enqueue(request::Header { height });
return true;
}
};
if !cache.headers.contains_key(&blockhash) {
queuer.enqueue(request::Header { height });
}
if let Some(anchor) = cache.anchors.get(&(txid, blockhash)) {
self.tx_update.anchors.insert((*anchor, txid));
return false;
};
if cache.failed_anchors.contains(&(txid, blockhash)) {
return false;
}
queuer.enqueue(request::GetTxMerkle { txid, height });
true
});
if anchors.len() < anchors_start_count {
made_progress = true;
}
self.stage = SpkJobStage::ProcessingTxsAndAnchors { txs, anchors };
(self, made_progress)
}
}
}
}