use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use crate::error::WalletError;
use crate::monitor::helpers::get_proofs;
use crate::monitor::{AsyncCallback, ONE_HOUR};
use crate::services::traits::WalletServices;
use crate::status::ProvenTxReqStatus;
use crate::storage::find_args::{FindProvenTxReqsArgs, Paged, ProvenTxReqPartial};
use crate::storage::manager::WalletStorageManager;
use crate::types::Chain;
use super::super::task_trait::WalletMonitorTask;
pub struct TaskCheckForProofs {
storage: Arc<WalletStorageManager>,
services: Arc<dyn WalletServices>,
trigger_msecs: u64,
last_run_msecs: u64,
check_now: Arc<AtomicBool>,
on_tx_proven: Option<AsyncCallback<String>>,
chain: Chain,
unproven_attempts_limit: u32,
last_new_header_height: Arc<AtomicU32>,
}
impl TaskCheckForProofs {
pub fn new(
storage: Arc<WalletStorageManager>,
services: Arc<dyn WalletServices>,
chain: Chain,
check_now: Arc<AtomicBool>,
unproven_attempts_limit: u32,
on_tx_proven: Option<AsyncCallback<String>>,
last_new_header_height: Arc<AtomicU32>,
) -> Self {
Self {
storage,
services,
trigger_msecs: 2 * ONE_HOUR,
last_run_msecs: 0,
check_now,
on_tx_proven,
chain,
unproven_attempts_limit,
last_new_header_height,
}
}
#[allow(clippy::too_many_arguments)]
pub fn with_trigger_msecs(
storage: Arc<WalletStorageManager>,
services: Arc<dyn WalletServices>,
chain: Chain,
check_now: Arc<AtomicBool>,
trigger_msecs: u64,
unproven_attempts_limit: u32,
on_tx_proven: Option<AsyncCallback<String>>,
last_new_header_height: Arc<AtomicU32>,
) -> Self {
Self {
storage,
services,
trigger_msecs,
last_run_msecs: 0,
check_now,
on_tx_proven,
chain,
unproven_attempts_limit,
last_new_header_height,
}
}
}
#[async_trait]
impl WalletMonitorTask for TaskCheckForProofs {
fn storage_manager(&self) -> Option<&WalletStorageManager> {
Some(&self.storage)
}
fn name(&self) -> &str {
"CheckForProofs"
}
fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
let check_now = self.check_now.load(Ordering::SeqCst);
let timer_expired = self.trigger_msecs > 0
&& now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs;
if check_now || timer_expired {
self.last_run_msecs = now_msecs_since_epoch;
true
} else {
false
}
}
async fn run_task(&mut self) -> Result<String, WalletError> {
let mut log = String::new();
let counts_as_attempt = self.check_now.load(Ordering::SeqCst);
self.check_now.store(false, Ordering::SeqCst);
let raw_height = self.last_new_header_height.load(Ordering::SeqCst);
let max_acceptable_height = if raw_height == u32::MAX {
return Ok(log);
} else {
Some(raw_height)
};
let limit = 100i64;
let mut offset = 0i64;
let statuses = vec![
ProvenTxReqStatus::Callback,
ProvenTxReqStatus::Unmined,
ProvenTxReqStatus::Sending,
ProvenTxReqStatus::Unknown,
ProvenTxReqStatus::Unconfirmed,
];
loop {
let args = FindProvenTxReqsArgs {
partial: ProvenTxReqPartial::default(),
statuses: Some(statuses.clone()),
paged: Some(Paged { limit, offset }),
since: None,
};
let reqs = self.storage.find_proven_tx_reqs(&args).await?;
let count = reqs.len();
if reqs.is_empty() {
break;
}
log.push_str(&format!(
"{} reqs with status 'callback', 'unmined', 'sending', 'unknown', or 'unconfirmed'\n",
reqs.len()
));
let r = get_proofs(
&self.storage,
self.services.as_ref(),
&reqs,
&self.chain,
self.unproven_attempts_limit,
counts_as_attempt,
max_acceptable_height,
)
.await?;
log.push_str(&r.log);
if let Some(ref cb) = self.on_tx_proven {
for proven_req in &r.proven {
cb(proven_req.txid.clone()).await;
}
}
if (count as i64) < limit {
break;
}
offset += limit;
}
Ok(log)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_check_now_triggers_run() {
let check_now = Arc::new(AtomicBool::new(true));
assert!(check_now.load(Ordering::SeqCst));
check_now.store(false, Ordering::SeqCst);
assert!(!check_now.load(Ordering::SeqCst));
}
#[test]
fn test_default_trigger_interval() {
assert_eq!(2 * ONE_HOUR, 7_200_000);
}
#[test]
fn test_task_name() {
assert_eq!("CheckForProofs", "CheckForProofs");
}
}