pub mod helpers;
pub mod task_trait;
pub mod tasks;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use tracing::{error, info, warn};
use crate::error::{WalletError, WalletResult};
use crate::services::traits::WalletServices;
use crate::services::types::BlockHeader;
use crate::storage::find_args::PurgeParams;
use crate::storage::manager::WalletStorageManager;
use crate::types::Chain;
use self::helpers::{log_event, now_msecs};
use self::task_trait::WalletMonitorTask;
pub const ONE_SECOND: u64 = 1000;
pub const ONE_MINUTE: u64 = 60 * ONE_SECOND;
pub const ONE_HOUR: u64 = 60 * ONE_MINUTE;
pub const ONE_DAY: u64 = 24 * ONE_HOUR;
pub const ONE_WEEK: u64 = 7 * ONE_DAY;
pub type AsyncCallback<T> =
Arc<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub struct MonitorOptions {
pub chain: Chain,
pub task_run_wait_msecs: u64,
pub abandoned_msecs: u64,
pub msecs_wait_per_merkle_proof_service_req: u64,
pub unproven_attempts_limit_test: u32,
pub unproven_attempts_limit_main: u32,
pub callback_token: Option<String>,
pub on_tx_broadcasted: Option<AsyncCallback<String>>,
pub on_tx_proven: Option<AsyncCallback<String>>,
pub on_tx_status_changed: Option<AsyncCallback<(String, String)>>,
}
impl Default for MonitorOptions {
fn default() -> Self {
Self {
chain: Chain::Main,
task_run_wait_msecs: 5000,
abandoned_msecs: ONE_MINUTE * 5,
msecs_wait_per_merkle_proof_service_req: 500,
unproven_attempts_limit_test: 10,
unproven_attempts_limit_main: 144,
callback_token: None,
on_tx_broadcasted: None,
on_tx_proven: None,
on_tx_status_changed: None,
}
}
}
#[derive(Debug, Clone)]
pub struct DeactivatedHeader {
pub when_msecs: u64,
pub tries: u32,
pub header: BlockHeader,
}
pub struct Monitor {
pub options: MonitorOptions,
pub storage: Arc<WalletStorageManager>,
pub services: Arc<dyn WalletServices>,
pub chain: Chain,
tasks: Vec<Box<dyn WalletMonitorTask>>,
other_tasks: Vec<Box<dyn WalletMonitorTask>>,
running: Arc<AtomicBool>,
pub check_now: Arc<AtomicBool>,
pub last_new_header_height: Arc<AtomicU32>,
pub last_new_header: Option<BlockHeader>,
pub last_new_header_when: Option<u64>,
pub deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
join_handle: Option<tokio::task::JoinHandle<()>>,
run_async_setup: bool,
}
pub fn default_purge_params() -> PurgeParams {
PurgeParams {
purge_spent: false,
purge_completed: false,
purge_failed: true,
purge_spent_age: 2 * ONE_WEEK,
purge_completed_age: 2 * ONE_WEEK,
purge_failed_age: 5 * ONE_DAY,
purge_monitor_events: true,
purge_monitor_events_age: 30 * ONE_DAY,
}
}
impl Monitor {
pub fn builder() -> MonitorBuilder {
MonitorBuilder::new()
}
pub fn start_tasks(&mut self) -> WalletResult<()> {
if self.running.load(Ordering::SeqCst) {
return Err(WalletError::BadRequest(
"monitor tasks are already running".to_string(),
));
}
self.running.store(true, Ordering::SeqCst);
self.run_async_setup = true;
let running = self.running.clone();
let _check_now = self.check_now.clone();
let _deactivated_headers = self.deactivated_headers.clone();
let task_run_wait_msecs = self.options.task_run_wait_msecs;
let mut tasks: Vec<Box<dyn WalletMonitorTask>> = std::mem::take(&mut self.tasks);
let storage = self.storage.clone();
let handle = tokio::spawn(async move {
if let Err(e) = storage.make_available().await {
warn!("Monitor storage make_available failed: {e}");
}
for task in tasks.iter_mut() {
if !running.load(Ordering::SeqCst) {
break;
}
if let Err(e) = task.async_setup().await {
let details = format!("monitor task {} asyncSetup error: {}", task.name(), e);
warn!("{}", details);
let _ = log_event(&storage, "error0", &details).await;
}
}
while running.load(Ordering::SeqCst) {
let now = now_msecs();
let mut triggered_indices = Vec::new();
for (i, task) in tasks.iter_mut().enumerate() {
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
task.trigger(now)
})) {
Ok(should_run) => {
if should_run {
triggered_indices.push(i);
}
}
Err(_) => {
let details = format!("monitor task {} trigger panicked", task.name());
error!("{}", details);
let _ = log_event(&storage, "error0", &details).await;
}
}
}
for idx in triggered_indices {
if !running.load(Ordering::SeqCst) {
break;
}
let task = &mut tasks[idx];
match task.run_task().await {
Ok(log) => {
if !log.is_empty() {
info!("Task {} {}", task.name(), &log[..log.len().min(1024)]);
let _ = log_event(&storage, task.name(), &log).await;
}
}
Err(e) => {
let details =
format!("monitor task {} runTask error: {}", task.name(), e);
error!("{}", details);
let _ = log_event(&storage, "error1", &details).await;
}
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(task_run_wait_msecs)).await;
}
info!("Monitor polling loop stopped");
});
self.join_handle = Some(handle);
Ok(())
}
pub async fn stop_tasks(&mut self) {
self.running.store(false, Ordering::SeqCst);
if let Some(handle) = self.join_handle.take() {
let _ = handle.await;
}
}
pub async fn destroy(&mut self) {
self.stop_tasks().await;
}
pub async fn run_once(&mut self) -> WalletResult<()> {
if self.run_async_setup {
for task in self.tasks.iter_mut() {
if let Err(e) = task.async_setup().await {
let details = format!("monitor task {} asyncSetup error: {}", task.name(), e);
warn!("{}", details);
let _ = log_event(&self.storage, "error0", &details).await;
}
}
self.run_async_setup = false;
}
let now = now_msecs();
let mut triggered_indices = Vec::new();
for (i, task) in self.tasks.iter_mut().enumerate() {
if task.trigger(now) {
triggered_indices.push(i);
}
}
for idx in triggered_indices {
let task = &mut self.tasks[idx];
match task.run_task().await {
Ok(log) => {
if !log.is_empty() {
info!("Task {} {}", task.name(), &log[..log.len().min(1024)]);
let _ = log_event(&self.storage, task.name(), &log).await;
}
}
Err(e) => {
let details = format!("monitor task {} runTask error: {}", task.name(), e);
error!("{}", details);
let _ = log_event(&self.storage, "error1", &details).await;
}
}
}
Ok(())
}
pub async fn run_task(&mut self, name: &str) -> WalletResult<String> {
for task in self.tasks.iter_mut() {
if task.name() == name {
task.async_setup().await?;
return task.run_task().await;
}
}
for task in self.other_tasks.iter_mut() {
if task.name() == name {
task.async_setup().await?;
return task.run_task().await;
}
}
Err(WalletError::InvalidParameter {
parameter: "name".to_string(),
must_be: format!("an existing task name, '{}' not found", name),
})
}
pub fn process_new_block_header(&mut self, header: BlockHeader) {
self.last_new_header_height
.store(header.height, Ordering::SeqCst);
self.last_new_header = Some(header);
self.last_new_header_when = Some(now_msecs());
self.check_now.store(true, Ordering::SeqCst);
}
pub async fn process_reorg(
&self,
_depth: u32,
_old_tip: &BlockHeader,
_new_tip: &BlockHeader,
deactivated: Option<&[BlockHeader]>,
) {
if let Some(headers) = deactivated {
let mut queue = self.deactivated_headers.lock().await;
for header in headers {
queue.push(DeactivatedHeader {
when_msecs: now_msecs(),
tries: 0,
header: header.clone(),
});
}
}
}
pub fn process_header(&self, _header: &BlockHeader) {
}
pub async fn call_on_broadcasted_transaction(&self, broadcast_result: &str) {
if let Some(ref cb) = self.options.on_tx_broadcasted {
cb(broadcast_result.to_string()).await;
}
}
pub async fn call_on_proven_transaction(&self, tx_status: &str) {
if let Some(ref cb) = self.options.on_tx_proven {
cb(tx_status.to_string()).await;
}
}
pub async fn call_on_transaction_status_changed(&self, txid: &str, new_status: &str) {
if let Some(ref cb) = self.options.on_tx_status_changed {
cb((txid.to_string(), new_status.to_string())).await;
}
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub fn add_task(&mut self, task: Box<dyn WalletMonitorTask>) -> WalletResult<()> {
let name = task.name().to_string();
if self.tasks.iter().any(|t| t.name() == name) {
return Err(WalletError::BadRequest(format!(
"task {} has already been added",
name
)));
}
self.tasks.push(task);
Ok(())
}
pub fn remove_task(&mut self, name: &str) {
self.tasks.retain(|t| t.name() != name);
}
}
pub struct MonitorBuilder {
chain: Option<Chain>,
storage: Option<Arc<WalletStorageManager>>,
services: Option<Arc<dyn WalletServices>>,
options: MonitorOptions,
default_tasks: bool,
multi_user_tasks: bool,
extra_tasks: Vec<Box<dyn WalletMonitorTask>>,
removed_task_names: Vec<String>,
}
impl MonitorBuilder {
fn new() -> Self {
Self {
chain: None,
storage: None,
services: None,
options: MonitorOptions::default(),
default_tasks: false,
multi_user_tasks: false,
extra_tasks: Vec::new(),
removed_task_names: Vec::new(),
}
}
pub fn chain(mut self, chain: Chain) -> Self {
self.chain = Some(chain);
self
}
pub fn storage(mut self, storage: Arc<WalletStorageManager>) -> Self {
self.storage = Some(storage);
self
}
pub fn services(mut self, services: Arc<dyn WalletServices>) -> Self {
self.services = Some(services);
self
}
pub fn task_run_wait_msecs(mut self, msecs: u64) -> Self {
self.options.task_run_wait_msecs = msecs;
self
}
pub fn abandoned_msecs(mut self, msecs: u64) -> Self {
self.options.abandoned_msecs = msecs;
self
}
pub fn callback_token(mut self, token: String) -> Self {
self.options.callback_token = Some(token);
self
}
pub fn default_tasks(mut self) -> Self {
self.default_tasks = true;
self
}
pub fn multi_user_tasks(mut self) -> Self {
self.multi_user_tasks = true;
self
}
pub fn add_task(mut self, task: Box<dyn WalletMonitorTask>) -> Self {
self.extra_tasks.push(task);
self
}
pub fn remove_task(mut self, name: &str) -> Self {
self.removed_task_names.push(name.to_string());
self
}
pub fn on_tx_broadcasted(mut self, cb: AsyncCallback<String>) -> Self {
self.options.on_tx_broadcasted = Some(cb);
self
}
pub fn on_tx_proven(mut self, cb: AsyncCallback<String>) -> Self {
self.options.on_tx_proven = Some(cb);
self
}
pub fn on_tx_status_changed(mut self, cb: AsyncCallback<(String, String)>) -> Self {
self.options.on_tx_status_changed = Some(cb);
self
}
pub fn build(mut self) -> WalletResult<Monitor> {
let chain = self
.chain
.ok_or_else(|| WalletError::MissingParameter("chain".to_string()))?;
let storage = self
.storage
.ok_or_else(|| WalletError::MissingParameter("storage".to_string()))?;
let services = self
.services
.ok_or_else(|| WalletError::MissingParameter("services".to_string()))?;
self.options.chain = chain.clone();
let check_now = Arc::new(AtomicBool::new(false));
let last_new_header_height = Arc::new(AtomicU32::new(u32::MAX));
let deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let make_storage =
|s: &Arc<WalletStorageManager>| -> Arc<WalletStorageManager> { s.clone() };
let unproven_limit = match chain {
Chain::Test => self.options.unproven_attempts_limit_test,
_ => self.options.unproven_attempts_limit_main,
};
let mut tasks: Vec<Box<dyn WalletMonitorTask>> = Vec::new();
if self.default_tasks || self.multi_user_tasks {
tasks.push(Box::new(tasks::task_clock::TaskClock::new()));
tasks.push(Box::new(
tasks::task_monitor_call_history::TaskMonitorCallHistory::new(services.clone()),
));
tasks.push(Box::new(tasks::task_new_header::TaskNewHeader::new(
make_storage(&storage),
services.clone(),
check_now.clone(),
last_new_header_height.clone(),
)));
tasks.push(Box::new(tasks::task_send_waiting::TaskSendWaiting::new(
make_storage(&storage),
services.clone(),
chain.clone(),
self.options.on_tx_broadcasted.clone(),
)));
tasks.push(Box::new(
tasks::task_check_for_proofs::TaskCheckForProofs::new(
make_storage(&storage),
services.clone(),
chain.clone(),
check_now.clone(),
unproven_limit,
self.options.on_tx_proven.clone(),
last_new_header_height.clone(),
),
));
tasks.push(Box::new(tasks::task_check_no_sends::TaskCheckNoSends::new(
make_storage(&storage),
services.clone(),
chain.clone(),
unproven_limit,
last_new_header_height.clone(),
)));
tasks.push(Box::new(
tasks::task_fail_abandoned::TaskFailAbandoned::new(
make_storage(&storage),
self.options.abandoned_msecs,
),
));
tasks.push(Box::new(tasks::task_unfail::TaskUnFail::new(
make_storage(&storage),
services.clone(),
)));
tasks.push(Box::new(tasks::task_review_status::TaskReviewStatus::new(
make_storage(&storage),
)));
tasks.push(Box::new(
tasks::task_review_double_spends::TaskReviewDoubleSpends::new(
make_storage(&storage),
services.clone(),
),
));
tasks.push(Box::new(
tasks::task_review_proven_txs::TaskReviewProvenTxs::new(
make_storage(&storage),
services.clone(),
),
));
tasks.push(Box::new(tasks::task_review_utxos::TaskReviewUtxos::new(
make_storage(&storage),
)));
tasks.push(Box::new(tasks::task_reorg::TaskReorg::new(
make_storage(&storage),
services.clone(),
deactivated_headers.clone(),
)));
if self.default_tasks {
tasks.push(Box::new(tasks::task_arc_sse::TaskArcSse::new(
make_storage(&storage),
services.clone(),
self.options.callback_token.clone(),
self.options.on_tx_status_changed.clone(),
)));
tasks.push(Box::new(tasks::task_sync_when_idle::TaskSyncWhenIdle::new()));
}
tasks.push(Box::new(tasks::task_purge::TaskPurge::new(
make_storage(&storage),
default_purge_params(),
)));
}
for name in &self.removed_task_names {
tasks.retain(|t| t.name() != name.as_str());
}
tasks.append(&mut self.extra_tasks);
Ok(Monitor {
options: self.options,
storage,
services,
chain,
tasks,
other_tasks: Vec::new(),
running: Arc::new(AtomicBool::new(false)),
check_now,
last_new_header_height,
last_new_header: None,
last_new_header_when: None,
deactivated_headers,
join_handle: None,
run_async_setup: true,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::services::types::BlockHeader;
#[allow(dead_code)]
struct MockServices {
chain: Chain,
}
#[async_trait::async_trait]
impl WalletServices for MockServices {
fn chain(&self) -> Chain {
self.chain.clone()
}
async fn get_chain_tracker(
&self,
) -> WalletResult<Box<dyn bsv::transaction::chain_tracker::ChainTracker>> {
Err(WalletError::NotImplemented("mock".into()))
}
async fn get_merkle_path(
&self,
_txid: &str,
_use_next: bool,
) -> crate::services::types::GetMerklePathResult {
crate::services::types::GetMerklePathResult::default()
}
async fn get_raw_tx(
&self,
_txid: &str,
_use_next: bool,
) -> crate::services::types::GetRawTxResult {
crate::services::types::GetRawTxResult::default()
}
async fn post_beef(
&self,
_beef: &[u8],
_txids: &[String],
) -> Vec<crate::services::types::PostBeefResult> {
vec![]
}
async fn get_utxo_status(
&self,
_output: &str,
_output_format: Option<crate::services::types::GetUtxoStatusOutputFormat>,
_outpoint: Option<&str>,
_use_next: bool,
) -> crate::services::types::GetUtxoStatusResult {
crate::services::types::GetUtxoStatusResult {
name: "mock".to_string(),
status: "error".to_string(),
error: Some("mock".to_string()),
is_utxo: None,
details: vec![],
}
}
async fn get_status_for_txids(
&self,
_txids: &[String],
_use_next: bool,
) -> crate::services::types::GetStatusForTxidsResult {
crate::services::types::GetStatusForTxidsResult {
name: "mock".to_string(),
status: "error".to_string(),
error: Some("mock".to_string()),
results: vec![],
}
}
async fn get_script_hash_history(
&self,
_hash: &str,
_use_next: bool,
) -> crate::services::types::GetScriptHashHistoryResult {
crate::services::types::GetScriptHashHistoryResult {
name: "mock".to_string(),
status: "error".to_string(),
error: Some("mock".to_string()),
history: vec![],
}
}
async fn hash_to_header(&self, _hash: &str) -> WalletResult<BlockHeader> {
Err(WalletError::NotImplemented("mock".into()))
}
async fn get_header_for_height(&self, _height: u32) -> WalletResult<Vec<u8>> {
Err(WalletError::NotImplemented("mock".into()))
}
async fn get_height(&self) -> WalletResult<u32> {
Ok(800000)
}
async fn n_lock_time_is_final(
&self,
_input: crate::services::types::NLockTimeInput,
) -> WalletResult<bool> {
Ok(true)
}
async fn get_bsv_exchange_rate(
&self,
) -> WalletResult<crate::services::types::BsvExchangeRate> {
Err(WalletError::NotImplemented("mock".into()))
}
async fn get_fiat_exchange_rate(
&self,
_currency: &str,
_base: Option<&str>,
) -> WalletResult<f64> {
Ok(1.0)
}
async fn get_fiat_exchange_rates(
&self,
_target: &[String],
) -> WalletResult<crate::services::types::FiatExchangeRates> {
Err(WalletError::NotImplemented("mock".into()))
}
fn get_services_call_history(
&self,
_reset: bool,
) -> crate::services::types::ServicesCallHistory {
crate::services::types::ServicesCallHistory { services: vec![] }
}
async fn get_beef_for_txid(&self, _txid: &str) -> WalletResult<bsv::transaction::Beef> {
Err(WalletError::NotImplemented("mock".into()))
}
fn hash_output_script(&self, _script: &[u8]) -> String {
String::new()
}
async fn is_utxo(
&self,
_locking_script: &[u8],
_txid: &str,
_vout: u32,
) -> WalletResult<bool> {
Ok(false)
}
}
#[test]
fn test_monitor_builder_validates_required_fields() {
let result = MonitorBuilder::new().build();
assert!(result.is_err());
match result {
Err(e) => assert!(
e.to_string().contains("chain"),
"Expected chain error, got: {}",
e
),
Ok(_) => panic!("Expected error for missing chain"),
}
let result = MonitorBuilder::new().chain(Chain::Test).build();
assert!(result.is_err());
match result {
Err(e) => assert!(
e.to_string().contains("storage"),
"Expected storage error, got: {}",
e
),
Ok(_) => panic!("Expected error for missing storage"),
}
}
#[test]
fn test_time_constants() {
assert_eq!(ONE_SECOND, 1000);
assert_eq!(ONE_MINUTE, 60_000);
assert_eq!(ONE_HOUR, 3_600_000);
assert_eq!(ONE_DAY, 86_400_000);
assert_eq!(ONE_WEEK, 604_800_000);
}
#[test]
fn test_default_purge_params() {
let params = default_purge_params();
assert!(!params.purge_spent);
assert!(!params.purge_completed);
assert!(params.purge_failed);
assert_eq!(params.purge_spent_age, 2 * ONE_WEEK);
assert_eq!(params.purge_completed_age, 2 * ONE_WEEK);
assert_eq!(params.purge_failed_age, 5 * ONE_DAY);
}
#[test]
fn test_deactivated_header() {
let header = BlockHeader {
version: 1,
previous_hash: "0000".to_string(),
merkle_root: "abcd".to_string(),
time: 1234567890,
bits: 0x1d00ffff,
nonce: 42,
height: 100,
hash: "blockhash".to_string(),
};
let dh = DeactivatedHeader {
when_msecs: 1000,
tries: 0,
header: header.clone(),
};
assert_eq!(dh.when_msecs, 1000);
assert_eq!(dh.tries, 0);
assert_eq!(dh.header.height, 100);
}
#[test]
fn test_now_msecs_returns_reasonable_value() {
let now = now_msecs();
assert!(now > 1_577_836_800_000);
}
#[tokio::test]
async fn test_process_reorg_adds_deactivated_headers() {
let queue: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let header = BlockHeader {
version: 1,
previous_hash: "0000".to_string(),
merkle_root: "abcd".to_string(),
time: 1234567890,
bits: 0x1d00ffff,
nonce: 42,
height: 100,
hash: "blockhash".to_string(),
};
{
let mut q = queue.lock().await;
q.push(DeactivatedHeader {
when_msecs: now_msecs(),
tries: 0,
header: header.clone(),
});
}
let q = queue.lock().await;
assert_eq!(q.len(), 1);
assert_eq!(q[0].header.height, 100);
assert_eq!(q[0].tries, 0);
}
}