use std::collections::HashSet;
use std::io::Cursor;
use std::time::{SystemTime, UNIX_EPOCH};
use bsv::transaction::beef::{Beef, BEEF_V2};
use bsv::transaction::transaction::Transaction as BsvTransaction;
use crate::error::WalletResult;
use crate::services::traits::WalletServices;
use crate::services::types::GetMerklePathResult;
use crate::status::ProvenTxReqStatus;
use crate::storage::beef::{get_valid_beef_for_txid, TrustSelf};
use crate::storage::find_args::ProvenTxReqPartial;
use crate::storage::manager::WalletStorageManager;
use crate::tables::{MonitorEvent, ProvenTx, ProvenTxReq};
use crate::types::Chain;
pub fn now_msecs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
pub async fn log_event(
storage: &WalletStorageManager,
event: &str,
details: &str,
) -> WalletResult<()> {
let now = chrono::Utc::now().naive_utc();
let monitor_event = MonitorEvent {
created_at: now,
updated_at: now,
id: 0,
event: event.to_string(),
details: Some(details.to_string()),
};
storage.insert_monitor_event(&monitor_event).await?;
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PostReqStatus {
Success,
DoubleSpend,
Invalid,
ServiceError,
Orphan,
Unknown,
}
#[derive(Debug)]
pub struct PostReqDetail {
pub txid: String,
pub req_id: i64,
pub status: PostReqStatus,
pub cascade_update_failed: bool,
}
#[derive(Debug)]
pub struct PostReqsToNetworkResult {
pub details: Vec<PostReqDetail>,
pub log: String,
}
pub async fn attempt_to_post_reqs_to_network(
storage: &WalletStorageManager,
services: &dyn WalletServices,
reqs: &[ProvenTxReq],
) -> WalletResult<PostReqsToNetworkResult> {
let mut result = PostReqsToNetworkResult {
details: Vec::new(),
log: String::new(),
};
if reqs.is_empty() {
return Ok(result);
}
for req in reqs {
if matches!(
req.status,
ProvenTxReqStatus::Completed | ProvenTxReqStatus::Unmined
) {
result.log.push_str(&format!(
" req {} txid {}: skipped (already {:?})\n",
req.proven_tx_req_id, req.txid, req.status
));
continue;
}
if req.raw_tx.is_empty() {
result.log.push_str(&format!(
" req {} txid {}: invalid (noRawTx=true)\n",
req.proven_tx_req_id, req.txid
));
let update = ProvenTxReqPartial {
status: Some(ProvenTxReqStatus::Invalid),
..Default::default()
};
if let Err(e) = storage
.update_proven_tx_req(req.proven_tx_req_id, &update)
.await
{
result.log.push_str(&format!(
" req {} txid {}: warn update_proven_tx_req(Invalid) failed: {}\n",
req.proven_tx_req_id, req.txid, e
));
}
result.details.push(PostReqDetail {
txid: req.txid.clone(),
req_id: req.proven_tx_req_id,
status: PostReqStatus::Invalid,
cascade_update_failed: false,
});
continue;
}
let active = match storage.active() {
Some(a) => a.clone(),
None => {
result.log.push_str(&format!(
" req {} txid {}: skipped (storage not active)\n",
req.proven_tx_req_id, req.txid
));
continue;
}
};
let mut beef = Beef::new(BEEF_V2);
if let Some(ref ib) = req.input_beef {
if !ib.is_empty() {
if let Err(e) = beef.merge_beef_from_binary(ib) {
result.log.push_str(&format!(
" req {} txid {}: warn mergeInputBeef failed \
(will refetch from storage): {}\n",
req.proven_tx_req_id, req.txid, e
));
}
}
}
if let Err(e) = beef.merge_raw_tx(&req.raw_tx, None) {
result.log.push_str(&format!(
" req {} txid {}: invalid (mergeRawTxFailed: {})\n",
req.proven_tx_req_id, req.txid, e
));
let update = ProvenTxReqPartial {
status: Some(ProvenTxReqStatus::Invalid),
..Default::default()
};
if let Err(e) = storage
.update_proven_tx_req(req.proven_tx_req_id, &update)
.await
{
result.log.push_str(&format!(
" req {} txid {}: warn update_proven_tx_req(Invalid) failed: {}\n",
req.proven_tx_req_id, req.txid, e
));
}
result.details.push(PostReqDetail {
txid: req.txid.clone(),
req_id: req.proven_tx_req_id,
status: PostReqStatus::Invalid,
cascade_update_failed: false,
});
continue;
}
let parsed_tx = match BsvTransaction::from_binary(&mut Cursor::new(&req.raw_tx)) {
Ok(t) => t,
Err(e) => {
result.log.push_str(&format!(
" req {} txid {}: invalid (parseTxFailed: {})\n",
req.proven_tx_req_id, req.txid, e
));
let update = ProvenTxReqPartial {
status: Some(ProvenTxReqStatus::Invalid),
..Default::default()
};
if let Err(e) = storage
.update_proven_tx_req(req.proven_tx_req_id, &update)
.await
{
result.log.push_str(&format!(
" req {} txid {}: warn update_proven_tx_req(Invalid) failed: {}\n",
req.proven_tx_req_id, req.txid, e
));
}
result.details.push(PostReqDetail {
txid: req.txid.clone(),
req_id: req.proven_tx_req_id,
status: PostReqStatus::Invalid,
cascade_update_failed: false,
});
continue;
}
};
let known_txids: HashSet<String> = HashSet::new();
let mut missing_source = false;
for input in &parsed_tx.inputs {
let source_txid = match &input.source_txid {
Some(t) => t.clone(),
None => continue,
};
if source_txid.is_empty() || beef.find_txid(&source_txid).is_some() {
continue;
}
match get_valid_beef_for_txid(&*active, &source_txid, TrustSelf::No, &known_txids).await
{
Ok(Some(src_bytes)) => {
if src_bytes.is_empty() {
missing_source = true;
result.log.push_str(&format!(
" req {} txid {}: empty source BEEF for {}\n",
req.proven_tx_req_id, req.txid, source_txid
));
} else if let Err(e) = beef.merge_beef_from_binary(&src_bytes) {
missing_source = true;
result.log.push_str(&format!(
" req {} txid {}: mergeSourceBeefFailed for {}: {}\n",
req.proven_tx_req_id, req.txid, source_txid, e
));
}
}
Ok(None) => {
missing_source = true;
result.log.push_str(&format!(
" req {} txid {}: missing source BEEF for {}\n",
req.proven_tx_req_id, req.txid, source_txid
));
}
Err(e) => {
missing_source = true;
result.log.push_str(&format!(
" req {} txid {}: storage error fetching source BEEF for {}: {}\n",
req.proven_tx_req_id, req.txid, source_txid, e
));
}
}
}
if missing_source {
result.details.push(PostReqDetail {
txid: req.txid.clone(),
req_id: req.proven_tx_req_id,
status: PostReqStatus::Unknown,
cascade_update_failed: false,
});
continue;
}
let mut beef_bytes = Vec::new();
if let Err(e) = beef.to_binary(&mut beef_bytes) {
result.log.push_str(&format!(
" req {} txid {}: invalid (serializeFailed: {})\n",
req.proven_tx_req_id, req.txid, e
));
let update = ProvenTxReqPartial {
status: Some(ProvenTxReqStatus::Invalid),
..Default::default()
};
if let Err(e) = storage
.update_proven_tx_req(req.proven_tx_req_id, &update)
.await
{
result.log.push_str(&format!(
" req {} txid {}: warn update_proven_tx_req(Invalid) failed: {}\n",
req.proven_tx_req_id, req.txid, e
));
}
result.details.push(PostReqDetail {
txid: req.txid.clone(),
req_id: req.proven_tx_req_id,
status: PostReqStatus::Invalid,
cascade_update_failed: false,
});
continue;
}
let txids = vec![req.txid.clone()];
let post_results = services.post_beef(&beef_bytes, &txids).await;
let mut success_count = 0u32;
let mut double_spend_count = 0u32;
let mut orphan_count = 0u32;
let mut status_error_count = 0u32;
let mut service_error_count = 0u32;
for pbr in &post_results {
for tr in &pbr.txid_results {
if tr.txid == req.txid {
if tr.status == "success" {
success_count += 1;
} else if tr.double_spend.unwrap_or(false) {
double_spend_count += 1;
} else if tr.orphan_mempool.unwrap_or(false) {
orphan_count += 1;
} else if tr.service_error.unwrap_or(false) {
service_error_count += 1;
} else {
status_error_count += 1;
}
}
}
}
let (new_req_status, post_status) = if success_count > 0 && double_spend_count == 0 {
(ProvenTxReqStatus::Unmined, PostReqStatus::Success)
} else if double_spend_count > 0 {
(ProvenTxReqStatus::DoubleSpend, PostReqStatus::DoubleSpend)
} else if orphan_count > 0 {
(ProvenTxReqStatus::Sending, PostReqStatus::Orphan)
} else if status_error_count > 0 {
(ProvenTxReqStatus::Invalid, PostReqStatus::Invalid)
} else {
(ProvenTxReqStatus::Sending, PostReqStatus::ServiceError)
};
result.log.push_str(&format!(
" req {} txid {}: {} (success={}, dblSpend={}, orphan={}, err={}, svcErr={})\n",
req.proven_tx_req_id,
req.txid,
match &post_status {
PostReqStatus::Success => "unmined",
PostReqStatus::DoubleSpend => "doubleSpend",
PostReqStatus::Invalid => "invalid",
PostReqStatus::ServiceError => "serviceError",
PostReqStatus::Orphan => "orphan",
PostReqStatus::Unknown => "unknown",
},
success_count,
double_spend_count,
orphan_count,
status_error_count,
service_error_count
));
let update = ProvenTxReqPartial {
status: Some(new_req_status.clone()),
..Default::default()
};
if let Err(e) = storage
.update_proven_tx_req(req.proven_tx_req_id, &update)
.await
{
result.log.push_str(&format!(
" req {} txid {}: warn update_proven_tx_req({:?}) failed: {}\n",
req.proven_tx_req_id, req.txid, new_req_status, e
));
}
let new_tx_status = match new_req_status {
ProvenTxReqStatus::Unmined => Some(crate::status::TransactionStatus::Unproven),
ProvenTxReqStatus::DoubleSpend => Some(crate::status::TransactionStatus::Failed),
ProvenTxReqStatus::Invalid => Some(crate::status::TransactionStatus::Failed),
ProvenTxReqStatus::Sending => Some(crate::status::TransactionStatus::Sending),
_ => None,
};
let mut cascade_update_failed = false;
if let Some(tx_status) = new_tx_status {
let is_failed = matches!(tx_status, crate::status::TransactionStatus::Failed);
if let Err(e) = storage
.update_transaction_status(&req.txid, tx_status)
.await
{
cascade_update_failed = true;
result.log.push_str(&format!(
" req {} txid {}: warn update_transaction_status: {}\n",
req.proven_tx_req_id, req.txid, e
));
} else if is_failed {
match storage
.find_transactions(&crate::storage::find_args::FindTransactionsArgs {
partial: crate::storage::find_args::TransactionPartial {
txid: Some(req.txid.clone()),
..Default::default()
},
no_raw_tx: true,
..Default::default()
})
.await
{
Ok(txs) => {
if let Some(tx) = txs.first() {
if let Err(e) = storage.restore_consumed_inputs(tx.transaction_id).await
{
result.log.push_str(&format!(
" req {} txid {}: warn restore_consumed_inputs: {}\n",
req.proven_tx_req_id, req.txid, e
));
}
}
}
Err(e) => {
result.log.push_str(&format!(
" req {} txid {}: warn find_transactions for restore: {}\n",
req.proven_tx_req_id, req.txid, e
));
}
}
}
}
result.details.push(PostReqDetail {
txid: req.txid.clone(),
req_id: req.proven_tx_req_id,
status: post_status,
cascade_update_failed,
});
}
Ok(result)
}
#[derive(Debug)]
pub struct GetProofDetail {
pub txid: String,
pub req_id: i64,
pub proven: bool,
}
#[derive(Debug)]
pub struct GetProofsResult {
pub proven: Vec<ProvenTxReq>,
pub invalid: Vec<ProvenTxReq>,
pub log: String,
}
pub async fn get_proofs(
storage: &WalletStorageManager,
services: &dyn WalletServices,
reqs: &[ProvenTxReq],
_chain: &Chain,
unproven_attempts_limit: u32,
counts_as_attempt: bool,
max_acceptable_height: Option<u32>,
) -> WalletResult<GetProofsResult> {
let mut result = GetProofsResult {
proven: Vec::new(),
invalid: Vec::new(),
log: String::new(),
};
if reqs.is_empty() {
return Ok(result);
}
for req in reqs {
result.log.push_str(&format!(
" reqId {} txid {}: ",
req.proven_tx_req_id, req.txid
));
if let Some(proven_tx_id) = req.proven_tx_id {
if proven_tx_id > 0 {
result
.log
.push_str(&format!("already linked to provenTxId {}.\n", proven_tx_id));
let update = ProvenTxReqPartial {
status: Some(ProvenTxReqStatus::Completed),
notified: Some(false),
..Default::default()
};
let _ = storage
.update_proven_tx_req(req.proven_tx_req_id, &update)
.await;
result.proven.push(req.clone());
continue;
}
}
if req.attempts > unproven_attempts_limit as i32 {
result
.log
.push_str(&format!("too many failed attempts {}\n", req.attempts));
let update = ProvenTxReqPartial {
status: Some(ProvenTxReqStatus::Invalid),
notified: Some(false),
..Default::default()
};
let _ = storage
.update_proven_tx_req(req.proven_tx_req_id, &update)
.await;
result.invalid.push(req.clone());
continue;
}
let gmpr: GetMerklePathResult = services.get_merkle_path(&req.txid, false).await;
if let (Some(merkle_path), Some(header)) = (&gmpr.merkle_path, &gmpr.header) {
if let Some(max_height) = max_acceptable_height {
if header.height > max_height {
result.log.push_str(&format!(
"ignoring possible proof from very new block at height {} {}\n",
header.height, header.hash
));
continue;
}
}
let now = chrono::Utc::now().naive_utc();
let proven_tx = ProvenTx {
created_at: now,
updated_at: now,
proven_tx_id: 0,
txid: req.txid.clone(),
height: header.height as i32,
index: 0, merkle_path: merkle_path.clone(),
raw_tx: req.raw_tx.clone(),
block_hash: header.hash.clone(),
merkle_root: header.merkle_root.clone(),
};
match storage
.update_proven_tx_req_with_new_proven_tx(req.proven_tx_req_id, &proven_tx)
.await
{
Ok(_proven_tx_id) => {
result.log.push_str(&format!(
"proven at height {} block {}\n",
header.height, header.hash
));
result.proven.push(req.clone());
}
Err(e) => {
result.log.push_str(&format!("error saving proof: {}\n", e));
}
}
} else {
if counts_as_attempt && req.status != ProvenTxReqStatus::Nosend {
let _new_attempts = req.attempts + 1;
result.log.push_str("no proof yet (attempt counted)\n");
} else {
result.log.push_str("no proof yet\n");
}
}
}
Ok(result)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_now_msecs_returns_reasonable_value() {
let now = now_msecs();
assert!(now > 1_577_836_800_000);
}
#[test]
fn test_post_req_status_variants() {
assert_eq!(PostReqStatus::Success, PostReqStatus::Success);
assert_ne!(PostReqStatus::Success, PostReqStatus::Invalid);
}
#[test]
fn test_max_acceptable_height_guard() {
let max_acceptable_height: Option<u32> = Some(100);
let proof_height: u32 = 101;
let should_skip = max_acceptable_height
.map(|max| proof_height > max)
.unwrap_or(false);
assert!(should_skip);
let proof_height: u32 = 100;
let should_skip = max_acceptable_height
.map(|max| proof_height > max)
.unwrap_or(false);
assert!(!should_skip);
let max: Option<u32> = None;
let should_skip = max.map(|m| 101u32 > m).unwrap_or(false);
assert!(!should_skip);
}
#[tokio::test]
async fn test_get_proofs_empty_reqs_result() {
let result = GetProofsResult {
proven: Vec::new(),
invalid: Vec::new(),
log: String::new(),
};
assert!(result.proven.is_empty());
assert!(result.invalid.is_empty());
assert!(result.log.is_empty());
}
}