use std::{
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use log::{debug, error, info, trace};
use monero::{
cryptonote::{hash::Hashable, onetime_key::SubKeyChecker},
Transaction, VarInt,
};
use tokio::{join, sync::Mutex as AsyncMutex};
use crate::{
caching::{BlockCache, TxpoolCache},
invoice::Transfer,
invoices_db::InvoicesDb,
rpc::RpcClient,
AcceptXmrError, SubIndex,
};
pub(crate) struct Scanner {
invoices_db: InvoicesDb,
block_cache: AsyncMutex<BlockCache>,
txpool_cache: AsyncMutex<TxpoolCache>,
first_scan: bool,
}
impl Scanner {
pub async fn new(
rpc_client: RpcClient,
invoices_db: InvoicesDb,
block_cache_size: usize,
atomic_cache_height: Arc<AtomicU64>,
atomic_daemon_height: Arc<AtomicU64>,
) -> Result<Scanner, AcceptXmrError> {
let daemon_height = rpc_client.daemon_height().await?;
let cache_height = match invoices_db.lowest_height() {
Ok(Some(h)) => {
info!("Pending invoices found in AcceptXMR database. Resuming from last block scanned: {}", h);
h - 1
}
Ok(None) => {
trace!("Retrieving daemon hight for scanner setup.");
let h = daemon_height;
info!("No pending invoices found in AcceptXMR database. Skipping to blockchain tip: {}", h);
h - 1
}
Err(e) => return Err(e)?,
};
atomic_cache_height.store(cache_height, Ordering::Relaxed);
atomic_daemon_height.store(daemon_height, Ordering::Relaxed);
let (block_cache, txpool_cache) = join!(
BlockCache::init(
rpc_client.clone(),
block_cache_size,
atomic_cache_height,
atomic_daemon_height
),
TxpoolCache::init(rpc_client.clone())
);
Ok(Scanner {
invoices_db,
block_cache: AsyncMutex::new(block_cache?),
txpool_cache: AsyncMutex::new(txpool_cache?),
first_scan: true,
})
}
pub async fn scan(
&mut self,
sub_key_checker: &SubKeyChecker<'_>,
) -> Result<(), AcceptXmrError> {
let (blocks_updated, new_transactions) = self.update_caches().await?;
let (blocks_amounts_or_err, txpool_amounts_or_err) = join!(
self.scan_blocks(sub_key_checker, blocks_updated),
self.scan_txpool(sub_key_checker, &new_transactions)
);
let block_cache_height = self.block_cache.lock().await.height.load(Ordering::Relaxed);
let blocks_amounts = match blocks_amounts_or_err {
Ok(amts) => amts,
Err(e) => {
error!("Skipping scan! Encountered a problem while updating or scanning the block cache: {}", e);
return Err(e);
}
};
let txpool_amounts = match txpool_amounts_or_err {
Ok(amts) => amts,
Err(e) => {
error!("Skipping scan! Encountered a problem while updating or scanning the txpool cache: {}", e);
return Err(e);
}
};
let transfers: Vec<(SubIndex, Transfer)> = blocks_amounts
.into_iter()
.chain(txpool_amounts.into_iter())
.collect();
if self.first_scan {
self.first_scan = false;
}
let deepest_update = block_cache_height - blocks_updated as u64 + 1;
let mut updated_invoices = Vec::new();
for invoice_or_err in self.invoices_db.iter() {
let old_invoice = match invoice_or_err {
Ok(p) => p,
Err(e) => {
error!(
"Failed to retrieve old invoice object from database while iterating through database: {}", e
);
continue;
}
};
let mut invoice = old_invoice.clone();
invoice.transfers.retain(|transfer| {
transfer
.cmp_by_height(&Transfer::new(0, Some(deepest_update)))
.is_lt()
});
for (sub_index, owned_transfer) in &transfers {
if sub_index == &invoice.index()
&& owned_transfer
.cmp_by_height(&Transfer::new(0, Some(invoice.creation_height() - 1)))
.is_gt()
{
invoice.transfers.push(*owned_transfer);
}
}
if invoice.current_height != block_cache_height + 1 {
invoice.current_height = block_cache_height + 1;
}
if invoice != old_invoice {
invoice.paid_height = None;
invoice.amount_paid = 0;
for transfer in &invoice.transfers {
invoice.amount_paid += transfer.amount;
if invoice.amount_paid >= invoice.amount_requested()
&& invoice.paid_height.is_none()
{
invoice.paid_height = transfer.height;
}
}
updated_invoices.push(invoice);
}
}
for invoice in updated_invoices {
debug!(
"Invoice update for subaddress index {}: \
\n{}",
invoice.index(),
invoice
);
if let Err(e) = self.invoices_db.update(invoice.id(), &invoice) {
error!(
"Failed to save update to invoice for index {} to database: {}",
invoice.index(),
e
);
}
}
self.invoices_db.flush()?;
Ok(())
}
async fn update_caches(&self) -> Result<(usize, Vec<Transaction>), AcceptXmrError> {
let mut block_cache = self.block_cache.lock().await;
let blocks_updated = if self.invoices_db.is_empty() {
block_cache.skip_ahead().await?
} else {
block_cache.update().await?
};
let mut txpool_cache = self.txpool_cache.lock().await;
let new_transactions = txpool_cache.update().await?;
Ok((blocks_updated, new_transactions))
}
async fn scan_blocks(
&self,
sub_key_checker: &SubKeyChecker<'_>,
mut blocks_updated: usize,
) -> Result<Vec<(SubIndex, Transfer)>, AcceptXmrError> {
let block_cache = self.block_cache.lock().await;
if self.first_scan {
blocks_updated = block_cache.blocks.len();
}
let mut transfers = Vec::new();
for i in (0..blocks_updated).rev() {
let transactions = &block_cache.blocks[i].3;
let amounts_received = self.scan_transactions(transactions, sub_key_checker)?;
trace!(
"Scanned {} transactions from block {}, and found {} transactions to tracked invoices",
transactions.len(),
block_cache.blocks[i].1,
amounts_received.len()
);
let block_cache_height: u64 = block_cache.height.load(Ordering::Relaxed) - i as u64;
transfers.extend::<Vec<(SubIndex, Transfer)>>(
amounts_received
.into_iter()
.flat_map(|(_, amounts)| amounts)
.map(|amount| (amount.0, Transfer::new(amount.1, Some(block_cache_height))))
.collect(),
);
}
Ok(transfers)
}
async fn scan_txpool(
&self,
sub_key_checker: &SubKeyChecker<'_>,
new_transactions: &[Transaction],
) -> Result<Vec<(SubIndex, Transfer)>, AcceptXmrError> {
let mut txpool_cache = self.txpool_cache.lock().await;
let discovered_transfers = txpool_cache.discovered_transfers();
let amounts_received = self.scan_transactions(new_transactions, sub_key_checker)?;
trace!(
"Scanned {} transactions from txpool, and found {} transfers for tracked invoices",
new_transactions.len(),
amounts_received.len()
);
let new_transfers: HashMap<monero::Hash, Vec<(SubIndex, Transfer)>> = amounts_received
.iter()
.map(|(hash, amounts)| {
(
*hash,
amounts
.iter()
.map(|(sub_index, amount)| (*sub_index, Transfer::new(*amount, None)))
.collect(),
)
})
.collect();
let mut transfers: HashMap<monero::Hash, Vec<(SubIndex, Transfer)>> = new_transfers.clone();
transfers.extend(discovered_transfers.clone());
txpool_cache.insert_transfers(&new_transfers);
Ok(transfers
.into_iter()
.flat_map(|(_, amounts)| amounts)
.collect())
}
fn scan_transactions(
&self,
transactions: &[monero::Transaction],
sub_key_checker: &SubKeyChecker,
) -> Result<HashMap<monero::Hash, Vec<(SubIndex, u64)>>, AcceptXmrError> {
let mut amounts_received = HashMap::new();
for tx in transactions {
if tx.prefix().unlock_time != VarInt(0) {
continue;
}
let transfers = tx.check_outputs_with(sub_key_checker)?;
for transfer in &transfers {
let sub_index = SubIndex::from(transfer.sub_index());
if self.invoices_db.contains_sub_index(sub_index)? {
let amount = transfers[0]
.amount()
.ok_or(AcceptXmrError::Unblind(sub_index))?;
amounts_received
.entry(tx.hash())
.or_insert_with(Vec::new)
.push((sub_index, amount.as_pico()));
}
}
}
Ok(amounts_received.into_iter().collect())
}
}