use std::{
ops::Deref,
str::FromStr,
sync::atomic::{self, AtomicU32, AtomicU64},
sync::{mpsc, Arc, Mutex, PoisonError},
thread::{self, JoinHandle},
time::Duration,
};
use hyper::Uri;
use log::{debug, error, info, warn};
use monero::cryptonote::onetime_key::SubKeyChecker;
use tokio::{join, runtime::Runtime, time};
use crate::{
caching::SubaddressCache,
invoices_db::InvoicesDb,
rpc::RpcClient,
scanner::Scanner,
subscriber::Subscriber,
{AcceptXmrError, Invoice, InvoiceId},
};
const DEFAULT_SCAN_INTERVAL: Duration = Duration::from_millis(1000);
const DEFAULT_DAEMON: &str = "http://node.moneroworld.com:18089";
const DEFAULT_RPC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_RPC_TOTAL_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_DB_PATH: &str = "AcceptXMR_DB";
const DEFAULT_BLOCK_CACHE_SIZE: usize = 10;
#[derive(Clone)]
pub struct PaymentGateway(pub(crate) Arc<PaymentGatewayInner>);
#[doc(hidden)]
pub struct PaymentGatewayInner {
rpc_client: RpcClient,
viewpair: monero::ViewPair,
scan_interval: Duration,
invoices_db: InvoicesDb,
subaddresses: Mutex<SubaddressCache>,
highest_minor_index: Arc<AtomicU32>,
block_cache_height: Arc<AtomicU64>,
cached_daemon_height: Arc<AtomicU64>,
scanner_handle: Mutex<Option<JoinHandle<Result<(), AcceptXmrError>>>>,
scanner_command_sender: (
Mutex<mpsc::Sender<MessageToScanner>>,
Arc<Mutex<mpsc::Receiver<MessageToScanner>>>,
),
}
impl Deref for PaymentGateway {
type Target = PaymentGatewayInner;
fn deref(&self) -> &PaymentGatewayInner {
&self.0
}
}
impl PaymentGateway {
#[must_use]
pub fn builder(private_view_key: String, primary_address: String) -> PaymentGatewayBuilder {
PaymentGatewayBuilder::new(private_view_key, primary_address)
}
#[allow(clippy::range_plus_one, clippy::missing_panics_doc)]
pub async fn run(&self) -> Result<(), AcceptXmrError> {
{
let scanner_handle = self
.scanner_handle
.lock()
.unwrap_or_else(PoisonError::into_inner);
if let Some(handle) = scanner_handle.as_ref() {
if !handle.is_finished() {
return Err(AcceptXmrError::AlreadyRunning);
}
};
}
let rpc_client = self.rpc_client.clone();
let viewpair = self.viewpair;
let scan_interval = self.scan_interval;
let highest_minor_index = self.highest_minor_index.clone();
let block_cache_height = self.block_cache_height.clone();
let cached_daemon_height = self.cached_daemon_height.clone();
let pending_invoices = self.invoices_db.clone();
let command_receiver = self.scanner_command_sender.1.clone();
debug!("Creating blockchain scanner");
let mut scanner = Scanner::new(
rpc_client,
pending_invoices,
DEFAULT_BLOCK_CACHE_SIZE,
block_cache_height,
cached_daemon_height,
)
.await?;
info!("Starting blockchain scanner");
*self.scanner_handle.lock().unwrap_or_else(PoisonError::into_inner) = Some(thread::Builder::new()
.name("Scanning Thread".to_string())
.spawn(move || -> Result<(), AcceptXmrError> {
let tokio_runtime = Runtime::new()?;
tokio_runtime.block_on(async move {
let mut sub_key_checker = SubKeyChecker::new(
&viewpair,
1..2,
0..highest_minor_index.load(atomic::Ordering::Relaxed) + 1,
);
let mut blockscan_interval = time::interval(scan_interval);
loop {
match command_receiver.lock().unwrap_or_else(PoisonError::into_inner).try_recv() {
Ok(MessageToScanner::Stop) => {
info!("Scanner received stop signal. Stopping gracefully");
break;
}
Err(mpsc::TryRecvError::Empty) => {
}
Err(mpsc::TryRecvError::Disconnected) => {
error!("Scanner lost connection to payment gateway. Stopping gracefully.");
break;
}
}
if sub_key_checker.table.len()
<= highest_minor_index.load(atomic::Ordering::Relaxed) as usize
{
sub_key_checker = SubKeyChecker::new(
&viewpair,
1..2,
0..highest_minor_index.load(atomic::Ordering::Relaxed) + 1,
);
}
if let (_, Err(e)) = join!(blockscan_interval.tick(), scanner.scan(&sub_key_checker)) {
error!("Payment gateway encountered an error while scanning for payments: {}", e);
};
}
});
Ok(())
})?);
debug!("Scanner started successfully");
Ok(())
}
#[must_use]
pub fn status(&self) -> PaymentGatewayStatus {
let scanner_handle = self
.scanner_handle
.lock()
.unwrap_or_else(PoisonError::into_inner);
match scanner_handle.as_ref() {
None => PaymentGatewayStatus::NotRunning,
Some(handle) if handle.is_finished() => {
let owned_handle = self
.scanner_handle
.lock()
.unwrap_or_else(PoisonError::into_inner)
.take();
match owned_handle.map(std::thread::JoinHandle::join) {
None | Some(Ok(Ok(_))) => PaymentGatewayStatus::NotRunning,
Some(Ok(Err(e))) => {
PaymentGatewayStatus::Error(AcceptXmrError::ScanningThread(Box::new(e)))
}
Some(Err(_)) => {
PaymentGatewayStatus::Error(AcceptXmrError::ScanningThreadPanic)
}
}
}
Some(_) => PaymentGatewayStatus::Running,
}
}
pub fn stop(&self) -> Result<(), AcceptXmrError> {
match self
.scanner_handle
.lock()
.unwrap_or_else(PoisonError::into_inner)
.take()
{
None => Ok(()),
Some(thread) if thread.is_finished() => match thread.join() {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(AcceptXmrError::ScanningThread(Box::new(e))),
Err(_) => Err(AcceptXmrError::ScanningThreadPanic),
},
Some(thread) => {
self.scanner_command_sender
.0
.lock()
.unwrap_or_else(PoisonError::into_inner)
.send(MessageToScanner::Stop)
.map_err(|e| AcceptXmrError::StopSignal(e.to_string()))?;
match thread.join() {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(AcceptXmrError::ScanningThread(Box::new(e))),
Err(_) => Err(AcceptXmrError::ScanningThreadPanic),
}
}
}
}
pub async fn new_invoice(
&self,
piconeros: u64,
confirmations_required: u64,
expiration_in: u64,
description: String,
) -> Result<InvoiceId, AcceptXmrError> {
let amount = piconeros;
let (sub_index, subaddress) = self
.subaddresses
.lock()
.unwrap_or_else(PoisonError::into_inner)
.remove_random();
let creation_height = self.cached_daemon_height.load(atomic::Ordering::Relaxed);
let invoice = Invoice::new(
subaddress,
sub_index,
creation_height,
amount,
confirmations_required,
expiration_in,
description,
);
self.invoices_db.insert(&invoice)?;
debug!(
"Now tracking invoice to subaddress index {}",
invoice.index()
);
Ok(invoice.id())
}
pub fn remove_invoice(&self, invoice_id: InvoiceId) -> Result<Option<Invoice>, AcceptXmrError> {
match self.invoices_db.remove(invoice_id)? {
Some(old) => {
if !(old.is_expired()
|| old.is_confirmed() && old.creation_height() < old.current_height())
{
warn!("Removed an invoice which was neither expired, nor fully confirmed and a block or more old. Was this intentional?");
}
self.subaddresses
.lock()
.unwrap_or_else(PoisonError::into_inner)
.insert(invoice_id.sub_index, old.address().to_string());
Ok(Some(old))
}
None => Ok(None),
}
}
pub fn subscribe(&self, invoice_id: InvoiceId) -> Result<Option<Subscriber>, AcceptXmrError> {
Ok(self.invoices_db.subscribe(invoice_id)?)
}
#[must_use]
pub fn subscribe_all(&self) -> Subscriber {
self.invoices_db.subscribe_all()
}
pub async fn daemon_height(&self) -> Result<u64, AcceptXmrError> {
Ok(self.rpc_client.daemon_height().await?)
}
#[must_use]
#[doc(hidden)]
pub fn cache_height(&self) -> u64 {
use std::sync::atomic::Ordering;
self.block_cache_height.load(Ordering::Relaxed)
}
pub fn get_invoice(&self, invoice_id: InvoiceId) -> Result<Option<Invoice>, AcceptXmrError> {
Ok(self.invoices_db.get(invoice_id)?)
}
#[must_use]
pub fn daemon_url(&self) -> String {
self.rpc_client.url()
}
}
pub struct PaymentGatewayBuilder {
daemon_url: String,
daemon_username: Option<String>,
daemon_password: Option<String>,
rpc_timeout: Duration,
rpc_connection_timeout: Duration,
private_view_key: String,
primary_address: String,
scan_interval: Duration,
db_path: String,
seed: Option<u64>,
}
impl PaymentGatewayBuilder {
#[must_use]
pub fn new(private_view_key: String, primary_address: String) -> PaymentGatewayBuilder {
PaymentGatewayBuilder {
daemon_url: DEFAULT_DAEMON.to_string(),
daemon_username: None,
daemon_password: None,
rpc_timeout: DEFAULT_RPC_TOTAL_TIMEOUT,
rpc_connection_timeout: DEFAULT_RPC_CONNECTION_TIMEOUT,
private_view_key,
primary_address,
scan_interval: DEFAULT_SCAN_INTERVAL,
db_path: DEFAULT_DB_PATH.to_string(),
seed: None,
}
}
#[must_use]
pub fn daemon_url(mut self, url: String) -> PaymentGatewayBuilder {
self.daemon_url = url;
self
}
#[must_use]
pub fn daemon_login(mut self, username: String, password: String) -> PaymentGatewayBuilder {
self.daemon_username = Some(username);
self.daemon_password = Some(password);
self
}
#[must_use]
pub fn rpc_timeout(mut self, timeout: Duration) -> PaymentGatewayBuilder {
self.rpc_timeout = timeout;
self
}
#[must_use]
pub fn rpc_connection_timeout(mut self, timeout: Duration) -> PaymentGatewayBuilder {
self.rpc_connection_timeout = timeout;
self
}
#[must_use]
pub fn scan_interval(mut self, interval: Duration) -> PaymentGatewayBuilder {
self.scan_interval = interval;
self
}
#[must_use]
pub fn db_path(mut self, path: String) -> PaymentGatewayBuilder {
self.db_path = path;
self
}
#[must_use]
pub fn seed(mut self, seed: u64) -> PaymentGatewayBuilder {
warn!("Seed set to {}. Some operations intended to be random (like the order in which subaddresses are used) will be predictable.", seed);
self.seed = Some(seed);
self
}
pub fn build(self) -> Result<PaymentGateway, AcceptXmrError> {
let rpc_client = RpcClient::new(
self.daemon_url
.parse::<Uri>()
.map_err(|e| AcceptXmrError::Parse {
datatype: "Uri",
input: self.daemon_url,
error: e.to_string(),
})?,
self.rpc_timeout,
self.rpc_connection_timeout,
self.daemon_username,
self.daemon_password,
self.seed,
);
let invoices_db = InvoicesDb::new(&self.db_path)?;
info!("Opened database in \"{}/\"", self.db_path);
let viewpair = monero::ViewPair {
view: monero::PrivateKey::from_str(&self.private_view_key).map_err(|e| {
AcceptXmrError::Parse {
datatype: "PrivateKey",
input: self.private_view_key.to_string(),
error: e.to_string(),
}
})?,
spend: monero::Address::from_str(&self.primary_address)
.map_err(|e| AcceptXmrError::Parse {
datatype: "Address",
input: self.primary_address.to_string(),
error: e.to_string(),
})?
.public_spend,
};
let highest_minor_index = Arc::new(AtomicU32::new(0));
let subaddresses = SubaddressCache::init(
&invoices_db,
viewpair,
highest_minor_index.clone(),
self.seed,
)?;
debug!("Generated {} initial subaddresses", subaddresses.len());
let (scanner_tx, scanner_rx) = mpsc::channel();
let scanner_command_sender = (Mutex::new(scanner_tx), Arc::new(Mutex::new(scanner_rx)));
Ok(PaymentGateway(Arc::new(PaymentGatewayInner {
rpc_client,
viewpair,
scan_interval: self.scan_interval,
invoices_db,
subaddresses: Mutex::new(subaddresses),
highest_minor_index,
block_cache_height: Arc::new(atomic::AtomicU64::new(0)),
cached_daemon_height: Arc::new(atomic::AtomicU64::new(0)),
scanner_handle: Mutex::new(None),
scanner_command_sender,
})))
}
}
#[derive(Debug)]
pub enum PaymentGatewayStatus {
Running,
NotRunning,
Error(AcceptXmrError),
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
pub(crate) enum MessageToScanner {
Stop,
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use std::env;
use tempfile::{Builder, TempDir};
use crate::PaymentGatewayBuilder;
fn init_logger() {
env::set_var(
"RUST_LOG",
"debug,mio=debug,want=debug,reqwest=info,sled=info,hyper=info,tracing=debug,httpmock=info,isahc=info",
);
let _ = env_logger::builder().is_test(true).try_init();
}
fn new_temp_dir() -> TempDir {
Builder::new()
.prefix("temp_db_")
.rand_bytes(16)
.tempdir()
.expect("failed to generate temporary directory")
}
const PRIVATE_VIEW_KEY: &str =
"ad2093a5705b9f33e6f0f0c1bc1f5f639c756cdfc168c8f2ac6127ccbdab3a03";
const PRIMARY_ADDRESS: &str =
"4613YiHLM6JMH4zejMB2zJY5TwQCxL8p65ufw8kBP5yxX9itmuGLqp1dS4tkVoTxjyH3aYhYNrtGHbQzJQP5bFus3KHVdmf";
#[test]
fn daemon_url() {
init_logger();
let temp_dir = new_temp_dir();
let payment_gateway =
PaymentGatewayBuilder::new(PRIVATE_VIEW_KEY.to_string(), PRIMARY_ADDRESS.to_string())
.db_path(
temp_dir
.path()
.to_str()
.expect("failed to get temporary directory path")
.to_string(),
)
.daemon_url("http://example.com:18081".to_string())
.build()
.expect("failed to build payment gateway");
assert_eq!(
payment_gateway.rpc_client.url(),
"http://example.com:18081/"
);
}
}