use crate::error::WalletResult;
use crate::services::traits::WalletServices;
use crate::services::types::PostBeefResult;
use crate::storage::find_args::{
FindOutputsArgs, FindTransactionsArgs, OutputPartial, TransactionPartial,
};
use crate::storage::manager::WalletStorageManager;
use crate::storage::TrxToken;
#[derive(Debug, Clone)]
pub enum BroadcastOutcome {
Success,
ServiceError { details: Vec<String> },
InvalidTx { details: Vec<String> },
DoubleSpend {
competing_txs: Vec<String>,
details: Vec<String>,
},
OrphanMempool { details: Vec<String> },
}
pub fn classify_broadcast_results(results: &[PostBeefResult]) -> BroadcastOutcome {
let mut has_success = false;
let mut has_double_spend = false;
let mut has_orphan = false;
let mut has_invalid = false;
let mut competing_txs = Vec::new();
let mut details = Vec::new();
for provider_result in results {
if provider_result.status == "success" {
has_success = true;
}
for txid_result in &provider_result.txid_results {
if txid_result.status == "success" {
has_success = true;
continue;
}
let detail = format!(
"{}: {} (txid={})",
provider_result.name, txid_result.status, txid_result.txid
);
details.push(detail);
if txid_result.double_spend == Some(true) {
has_double_spend = true;
if let Some(ref ctxs) = txid_result.competing_txs {
competing_txs.extend(ctxs.iter().cloned());
}
} else if txid_result.orphan_mempool == Some(true) {
has_orphan = true;
} else if txid_result.service_error != Some(true) {
has_invalid = true;
}
}
}
if has_success {
BroadcastOutcome::Success
} else if has_double_spend {
BroadcastOutcome::DoubleSpend {
competing_txs,
details,
}
} else if has_invalid {
BroadcastOutcome::InvalidTx { details }
} else if has_orphan {
BroadcastOutcome::OrphanMempool { details }
} else {
BroadcastOutcome::ServiceError { details }
}
}
async fn reconcile_tx_status(
services: &(dyn WalletServices + Send + Sync),
txid: &str,
outcome: &BroadcastOutcome,
) -> BroadcastOutcome {
if matches!(
outcome,
BroadcastOutcome::Success
| BroadcastOutcome::ServiceError { .. }
| BroadcastOutcome::OrphanMempool { .. }
) {
return outcome.clone();
}
let status_result = services
.get_status_for_txids(&[txid.to_string()], false)
.await;
if status_result.status != "success" {
let detail = status_result
.error
.unwrap_or_else(|| "chain status query failed".to_string());
tracing::warn!(
txid = %txid,
detail = %detail,
"reconcile_tx_status: wrapper failure -- downgrading to ServiceError"
);
return BroadcastOutcome::ServiceError {
details: vec![detail],
};
}
for r in &status_result.results {
if r.status == "mined" || r.status == "known" {
tracing::info!(
txid = %txid,
chain_status = %r.status,
"reconcile_tx_status: chain says tx is valid, overriding failure to Success"
);
return BroadcastOutcome::Success;
}
}
outcome.clone()
}
async fn utxo_verified_input_ids(
storage: &WalletStorageManager,
services: &(dyn WalletServices + Send + Sync),
failed_transaction_id: i64,
trx: Option<&TrxToken>,
) -> WalletResult<Vec<i64>> {
let consumed_inputs = storage
.find_outputs_trx(
&FindOutputsArgs {
partial: OutputPartial {
spent_by: Some(failed_transaction_id),
..Default::default()
},
..Default::default()
},
trx,
)
.await?;
if consumed_inputs.is_empty() {
return Ok(Vec::new());
}
let mut verified: Vec<i64> = Vec::new();
for output in &consumed_inputs {
let parent_txid: String = if let Some(ref t) = output.txid {
t.clone()
} else {
let txs = storage
.find_transactions_trx(
&FindTransactionsArgs {
partial: TransactionPartial {
transaction_id: Some(output.transaction_id),
..Default::default()
},
..Default::default()
},
trx,
)
.await?;
match txs.first().and_then(|t| t.txid.clone()) {
Some(t) => t,
None => {
tracing::warn!(
output_id = output.output_id,
transaction_id = output.transaction_id,
"utxo_verified_input_ids: parent tx has no txid, skipping (not restoring)"
);
continue;
}
}
};
let locking_script = match output.locking_script.as_deref() {
Some(s) if !s.is_empty() => s,
_ => {
tracing::warn!(
output_id = output.output_id,
parent_txid = %parent_txid,
"utxo_verified_input_ids: missing locking_script, skipping (not restoring)"
);
continue;
}
};
let vout = output.vout as u32;
match services.is_utxo(locking_script, &parent_txid, vout).await {
Ok(true) => {
verified.push(output.output_id);
}
Ok(false) => {
tracing::info!(
output_id = output.output_id,
parent_txid = %parent_txid,
vout = vout,
"utxo_verified_input_ids: parent outpoint no longer unspent, not restoring"
);
}
Err(e) => {
tracing::warn!(
output_id = output.output_id,
parent_txid = %parent_txid,
vout = vout,
error = %e,
"utxo_verified_input_ids: is_utxo check errored, not restoring"
);
}
}
}
Ok(verified)
}
pub async fn apply_success_or_orphan_outcome(
storage: &WalletStorageManager,
txid: &str,
outcome: &BroadcastOutcome,
) -> WalletResult<()> {
let is_orphan = matches!(outcome, BroadcastOutcome::OrphanMempool { .. });
let new_tx_status = if is_orphan {
crate::status::TransactionStatus::Sending
} else {
crate::status::TransactionStatus::Unproven
};
let _ = storage.update_transaction_status(txid, new_tx_status).await;
let reqs = storage
.find_proven_tx_reqs(&crate::storage::find_args::FindProvenTxReqsArgs {
partial: crate::storage::find_args::ProvenTxReqPartial {
txid: Some(txid.to_string()),
..Default::default()
},
..Default::default()
})
.await
.unwrap_or_default();
let new_status = if is_orphan {
crate::status::ProvenTxReqStatus::Sending
} else {
crate::status::ProvenTxReqStatus::Unmined
};
for req in &reqs {
let _ = storage
.update_proven_tx_req(
req.proven_tx_req_id,
&crate::storage::find_args::ProvenTxReqPartial {
status: Some(new_status.clone()),
..Default::default()
},
)
.await;
}
Ok(())
}
pub async fn apply_service_error_outcome(
storage: &WalletStorageManager,
txid: &str,
details: Vec<String>,
) -> WalletResult<()> {
let reqs = storage
.find_proven_tx_reqs(&crate::storage::find_args::FindProvenTxReqsArgs {
partial: crate::storage::find_args::ProvenTxReqPartial {
txid: Some(txid.to_string()),
..Default::default()
},
..Default::default()
})
.await?;
for req in &reqs {
let new_attempts = req.attempts.saturating_add(1);
storage
.update_proven_tx_req(
req.proven_tx_req_id,
&crate::storage::find_args::ProvenTxReqPartial {
status: Some(crate::status::ProvenTxReqStatus::Sending),
attempts: Some(new_attempts),
..Default::default()
},
)
.await?;
}
storage
.update_transaction_status(txid, crate::status::TransactionStatus::Sending)
.await?;
tracing::warn!(
txid = %txid,
details = ?details,
"broadcast: service error -- transitioned tx+req to Sending for retry"
);
Ok(())
}
pub async fn handle_permanent_broadcast_failure(
storage: &WalletStorageManager,
services: &(dyn WalletServices + Send + Sync),
txid: &str,
outcome: &BroadcastOutcome,
) -> WalletResult<BroadcastOutcome> {
let effective = reconcile_tx_status(services, txid, outcome).await;
if matches!(effective, BroadcastOutcome::Success) {
return Ok(effective);
}
if let BroadcastOutcome::ServiceError { details } = &effective {
apply_service_error_outcome(storage, txid, details.clone()).await?;
return Ok(effective);
}
let (new_tx_status, req_status) = match &effective {
BroadcastOutcome::DoubleSpend { .. } => (
crate::status::TransactionStatus::Failed,
crate::status::ProvenTxReqStatus::DoubleSpend,
),
BroadcastOutcome::InvalidTx { .. } => (
crate::status::TransactionStatus::Failed,
crate::status::ProvenTxReqStatus::Invalid,
),
_ => return Ok(effective),
};
let tx_lookup = storage
.find_transactions_trx(
&FindTransactionsArgs {
partial: TransactionPartial {
txid: Some(txid.to_string()),
..Default::default()
},
..Default::default()
},
None,
)
.await?;
let tx = match tx_lookup.first() {
Some(t) => t.clone(),
None => {
tracing::warn!(
txid = %txid,
"handle_permanent_broadcast_failure: tx not found in storage"
);
return Ok(effective);
}
};
let double_spend_safe_ids: Option<Vec<i64>> =
if matches!(effective, BroadcastOutcome::DoubleSpend { .. }) {
Some(utxo_verified_input_ids(storage, services, tx.transaction_id, None).await?)
} else {
None
};
let db_trx = storage.begin_transaction().await?;
storage
.update_transaction_status_trx(txid, new_tx_status, Some(&db_trx))
.await?;
let reqs = storage
.find_proven_tx_reqs_trx(
&crate::storage::find_args::FindProvenTxReqsArgs {
partial: crate::storage::find_args::ProvenTxReqPartial {
txid: Some(txid.to_string()),
..Default::default()
},
..Default::default()
},
Some(&db_trx),
)
.await?;
for req in &reqs {
storage
.update_proven_tx_req_trx(
req.proven_tx_req_id,
&crate::storage::find_args::ProvenTxReqPartial {
status: Some(req_status.clone()),
..Default::default()
},
Some(&db_trx),
)
.await?;
}
let outputs = storage
.find_outputs_trx(
&FindOutputsArgs {
partial: OutputPartial {
transaction_id: Some(tx.transaction_id),
..Default::default()
},
..Default::default()
},
Some(&db_trx),
)
.await?;
for output in &outputs {
storage
.update_output_trx(
output.output_id,
&OutputPartial {
spendable: Some(false),
..Default::default()
},
Some(&db_trx),
)
.await?;
}
let restored_count: usize = match &effective {
BroadcastOutcome::DoubleSpend { .. } => {
let safe_ids = double_spend_safe_ids.as_deref().unwrap_or(&[]);
for output_id in safe_ids {
storage
.update_output_trx(
*output_id,
&OutputPartial {
spendable: Some(true),
..Default::default()
},
Some(&db_trx),
)
.await?;
}
safe_ids.len()
}
BroadcastOutcome::InvalidTx { .. } => {
let consumed_inputs = storage
.find_outputs_trx(
&FindOutputsArgs {
partial: OutputPartial {
spent_by: Some(tx.transaction_id),
spendable: Some(false),
..Default::default()
},
..Default::default()
},
Some(&db_trx),
)
.await?;
for input_output in &consumed_inputs {
storage
.update_output_trx(
input_output.output_id,
&OutputPartial {
spendable: Some(true),
..Default::default()
},
Some(&db_trx),
)
.await?;
}
consumed_inputs.len()
}
_ => 0,
};
storage.commit_transaction(db_trx).await?;
match &effective {
BroadcastOutcome::DoubleSpend { .. } => {
tracing::info!(
txid = %txid,
restored_inputs = restored_count,
"handle_permanent_broadcast_failure: DoubleSpend — restored chain-verified inputs to spendable"
);
}
BroadcastOutcome::InvalidTx { .. } => {
tracing::info!(
txid = %txid,
restored_inputs = restored_count,
"handle_permanent_broadcast_failure: InvalidTx — restored consumed inputs to spendable"
);
}
_ => {}
}
tracing::warn!(
txid = %txid,
outcome = ?effective,
outputs_marked_unspendable = outputs.len(),
"handle_permanent_broadcast_failure: tx marked failed, outputs marked unspendable"
);
Ok(effective)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::services::types::{PostBeefResult, PostTxResultForTxid};
fn success_result(provider: &str, txid: &str) -> PostBeefResult {
PostBeefResult {
name: provider.to_string(),
status: "success".to_string(),
error: None,
txid_results: vec![PostTxResultForTxid {
txid: txid.to_string(),
status: "success".to_string(),
already_known: None,
double_spend: None,
block_hash: None,
block_height: None,
competing_txs: None,
service_error: None,
orphan_mempool: None,
}],
}
}
fn double_spend_result(provider: &str, txid: &str) -> PostBeefResult {
PostBeefResult {
name: provider.to_string(),
status: "error".to_string(),
error: Some("DOUBLE_SPEND_ATTEMPTED".to_string()),
txid_results: vec![PostTxResultForTxid {
txid: txid.to_string(),
status: "error".to_string(),
already_known: None,
double_spend: Some(true),
block_hash: None,
block_height: None,
competing_txs: Some(vec!["competing_tx_123".to_string()]),
service_error: None,
orphan_mempool: None,
}],
}
}
fn orphan_result(provider: &str, txid: &str) -> PostBeefResult {
PostBeefResult {
name: provider.to_string(),
status: "error".to_string(),
error: Some("SEEN_IN_ORPHAN_MEMPOOL".to_string()),
txid_results: vec![PostTxResultForTxid {
txid: txid.to_string(),
status: "error".to_string(),
already_known: None,
double_spend: None,
block_hash: None,
block_height: None,
competing_txs: None,
service_error: None,
orphan_mempool: Some(true),
}],
}
}
fn service_error_result(provider: &str, txid: &str) -> PostBeefResult {
PostBeefResult {
name: provider.to_string(),
status: "error".to_string(),
error: Some("timeout".to_string()),
txid_results: vec![PostTxResultForTxid {
txid: txid.to_string(),
status: "error".to_string(),
already_known: None,
double_spend: None,
block_hash: None,
block_height: None,
competing_txs: None,
service_error: Some(true),
orphan_mempool: None,
}],
}
}
#[test]
fn test_success_wins_over_errors() {
let results = vec![
service_error_result("ARC", "tx1"),
success_result("WoC", "tx1"),
];
assert!(matches!(
classify_broadcast_results(&results),
BroadcastOutcome::Success
));
}
#[test]
fn test_double_spend_takes_precedence() {
let results = vec![
double_spend_result("ARC", "tx1"),
service_error_result("WoC", "tx1"),
];
match classify_broadcast_results(&results) {
BroadcastOutcome::DoubleSpend {
competing_txs,
details,
} => {
assert_eq!(competing_txs, vec!["competing_tx_123"]);
assert!(!details.is_empty());
}
other => panic!("Expected DoubleSpend, got {:?}", other),
}
}
#[test]
fn test_orphan_mempool_classified_correctly() {
let results = vec![orphan_result("ARC", "tx1")];
assert!(matches!(
classify_broadcast_results(&results),
BroadcastOutcome::OrphanMempool { .. }
));
}
#[test]
fn test_all_service_errors() {
let results = vec![
service_error_result("ARC", "tx1"),
service_error_result("WoC", "tx1"),
];
assert!(matches!(
classify_broadcast_results(&results),
BroadcastOutcome::ServiceError { .. }
));
}
#[test]
fn test_success_wins_over_double_spend() {
let results = vec![
double_spend_result("ARC", "tx1"),
success_result("WoC", "tx1"),
];
assert!(matches!(
classify_broadcast_results(&results),
BroadcastOutcome::Success
));
}
#[test]
fn test_orphan_not_classified_as_double_spend() {
let results = vec![orphan_result("ARC", "tx1")];
match classify_broadcast_results(&results) {
BroadcastOutcome::DoubleSpend { .. } => {
panic!("Orphan mempool should NOT be classified as double-spend")
}
BroadcastOutcome::OrphanMempool { .. } => {} other => panic!("Expected OrphanMempool, got {:?}", other),
}
}
}