use std::sync::Arc;
use async_trait::async_trait;
use crate::error::WalletError;
use crate::monitor::helpers::now_msecs;
use crate::monitor::{DeactivatedHeader, ONE_MINUTE, ONE_SECOND};
use crate::services::traits::WalletServices;
use crate::storage::find_args::{FindProvenTxsArgs, ProvenTxPartial};
use crate::storage::manager::WalletStorageManager;
use crate::tables::ProvenTx;
use super::super::task_trait::WalletMonitorTask;
pub struct TaskReorg {
storage: Arc<WalletStorageManager>,
services: Arc<dyn WalletServices>,
aged_msecs: u64,
max_retries: u32,
last_run_msecs: u64,
trigger_msecs: u64,
deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
process: Vec<DeactivatedHeader>,
}
impl TaskReorg {
pub fn new(
storage: Arc<WalletStorageManager>,
services: Arc<dyn WalletServices>,
deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
) -> Self {
Self {
storage,
services,
aged_msecs: ONE_MINUTE * 10,
max_retries: 3,
last_run_msecs: 0,
trigger_msecs: ONE_SECOND * 30,
deactivated_headers,
process: Vec::new(),
}
}
pub fn with_intervals(
storage: Arc<WalletStorageManager>,
services: Arc<dyn WalletServices>,
deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
aged_msecs: u64,
max_retries: u32,
trigger_msecs: u64,
) -> Self {
Self {
storage,
services,
aged_msecs,
max_retries,
last_run_msecs: 0,
trigger_msecs,
deactivated_headers,
process: Vec::new(),
}
}
}
#[async_trait]
impl WalletMonitorTask for TaskReorg {
fn storage_manager(&self) -> Option<&WalletStorageManager> {
Some(&self.storage)
}
fn name(&self) -> &str {
"Reorg"
}
fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
if now_msecs_since_epoch <= self.last_run_msecs + self.trigger_msecs {
return false;
}
let cutoff = now_msecs_since_epoch.saturating_sub(self.aged_msecs);
if let Ok(mut queue) = self.deactivated_headers.try_lock() {
let mut remaining = Vec::new();
for header in queue.drain(..) {
if header.when_msecs < cutoff {
self.process.push(header);
} else {
remaining.push(header);
}
}
*queue = remaining;
}
if !self.process.is_empty() {
self.last_run_msecs = now_msecs_since_epoch;
true
} else {
false
}
}
async fn run_task(&mut self) -> Result<String, WalletError> {
let mut log = String::new();
while let Some(header) = self.process.pop() {
log.push_str(&format!(
" processing deactivated header height={} hash={} tries={}\n",
header.header.height, header.header.hash, header.tries
));
let find_args = FindProvenTxsArgs {
partial: ProvenTxPartial {
block_hash: Some(header.header.hash.clone()),
..Default::default()
},
since: None,
paged: None,
};
let proven_txs = match self.storage.find_proven_txs(&find_args).await {
Ok(txs) => txs,
Err(e) => {
log.push_str(&format!(" error finding proven txs: {}\n", e));
continue;
}
};
if proven_txs.is_empty() {
log.push_str(" no matching proven_txs records\n");
continue;
}
log.push_str(&format!(
" found {} proven_txs records for block {}\n",
proven_txs.len(),
header.header.hash
));
let mut unavailable_count = 0;
for ptx in &proven_txs {
let gmpr = self.services.get_merkle_path(&ptx.txid, false).await;
if let (Some(merkle_path), Some(new_header)) = (&gmpr.merkle_path, &gmpr.header) {
let now = chrono::Utc::now().naive_utc();
let _updated_ptx = ProvenTx {
created_at: ptx.created_at,
updated_at: now,
proven_tx_id: ptx.proven_tx_id,
txid: ptx.txid.clone(),
height: new_header.height as i32,
index: 0,
merkle_path: merkle_path.clone(),
raw_tx: ptx.raw_tx.clone(),
block_hash: new_header.hash.clone(),
merkle_root: new_header.merkle_root.clone(),
};
let update = crate::storage::find_args::ProvenTxPartial {
proven_tx_id: Some(ptx.proven_tx_id),
height: Some(new_header.height as i32),
block_hash: Some(new_header.hash.clone()),
..Default::default()
};
match self
.storage
.update_proven_tx(ptx.proven_tx_id, &update)
.await
{
Ok(_) => {
log.push_str(&format!(
" updated proof for txid {} to height {}\n",
ptx.txid, new_header.height
));
}
Err(e) => {
log.push_str(&format!(
" error updating proof for txid {}: {}\n",
ptx.txid, e
));
unavailable_count += 1;
}
}
} else {
log.push_str(&format!(
" no updated proof available for txid {}\n",
ptx.txid
));
unavailable_count += 1;
}
}
if unavailable_count > 0 {
if header.tries + 1 >= self.max_retries {
log.push_str(&format!(
" maximum retries {} exceeded\n",
self.max_retries
));
} else {
log.push_str(" retrying...\n");
if let Ok(mut queue) = self.deactivated_headers.try_lock() {
queue.push(DeactivatedHeader {
header: header.header.clone(),
when_msecs: now_msecs(),
tries: header.tries + 1,
});
}
}
}
}
Ok(log)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::services::types::BlockHeader;
#[test]
fn test_default_intervals() {
assert_eq!(ONE_MINUTE * 10, 600_000);
assert_eq!(ONE_SECOND * 30, 30_000);
}
#[test]
fn test_task_name() {
assert_eq!("Reorg", "Reorg");
}
#[tokio::test]
async fn test_deactivated_header_aging() {
let queue: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let header = BlockHeader {
version: 1,
previous_hash: "0000".to_string(),
merkle_root: "abcd".to_string(),
time: 1234567890,
bits: 0x1d00ffff,
nonce: 42,
height: 100,
hash: "blockhash".to_string(),
};
{
let mut q = queue.lock().await;
q.push(DeactivatedHeader {
when_msecs: 1000, tries: 0,
header: header.clone(),
});
}
let q = queue.lock().await;
assert_eq!(q.len(), 1);
assert_eq!(q[0].tries, 0);
}
}