use std::{
fmt::Debug,
ops::Deref,
str::FromStr,
sync::{
atomic::{self, AtomicU32, AtomicU64},
mpsc::{channel, Receiver, Sender, TryRecvError},
Arc, Mutex, PoisonError,
},
time::Duration,
};
use hyper::Uri;
use log::{debug, error, info, trace, warn};
use monero::cryptonote::onetime_key::SubKeyChecker;
use tokio::{join, sync::Mutex as AsyncMutex, time};
use crate::{
caching::SubaddressCache,
monerod_client::{
Client as MonerodClient, MockClient as MonerodMockClient, RpcClient as MonerodRpcClient,
},
pubsub::{Publisher, Subscriber},
scanner::{Scanner, ScannerHandle},
storage::{Client as StorageClient, Storage},
AcceptXmrError, Invoice, InvoiceId,
};
const DEFAULT_SCAN_INTERVAL: Duration = Duration::from_millis(1000);
const DEFAULT_DAEMON: &str = "http://node.moneroworld.com:18089";
const DEFAULT_RPC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_RPC_TOTAL_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_BLOCK_CACHE_SIZE: usize = 10;
pub struct PaymentGateway<S: Storage, M: MonerodClient = MonerodRpcClient>(
pub(crate) Arc<PaymentGatewayInner<S, M>>,
);
#[doc(hidden)]
pub struct PaymentGatewayInner<S: Storage, M: MonerodClient = MonerodRpcClient> {
monerod_client: M,
viewpair: monero::ViewPair,
scan_interval: Duration,
store: StorageClient<S>,
subaddresses: Mutex<SubaddressCache>,
major_index: u32,
highest_minor_index: Arc<AtomicU32>,
initial_height: Option<u64>,
block_cache_height: Arc<AtomicU64>,
cached_daemon_height: Arc<AtomicU64>,
scanner_handle: AsyncMutex<Option<ScannerHandle>>,
scanner_command_sender: (
Mutex<Sender<MessageToScanner>>,
Arc<Mutex<Receiver<MessageToScanner>>>,
),
publisher: Arc<Publisher>,
}
impl<S: Storage, M: MonerodClient> Clone for PaymentGateway<S, M> {
fn clone(&self) -> Self {
PaymentGateway(self.0.clone())
}
}
impl<S: Storage, M: MonerodClient> Deref for PaymentGateway<S, M> {
type Target = PaymentGatewayInner<S, M>;
fn deref(&self) -> &PaymentGatewayInner<S, M> {
&self.0
}
}
impl<S: Storage + 'static, M: MonerodClient + 'static> PaymentGateway<S, M> {
#[must_use]
pub fn builder(
private_view_key: String,
primary_address: String,
store: S,
) -> PaymentGatewayBuilder<S> {
PaymentGatewayBuilder::<S>::new(private_view_key, primary_address, store)
}
#[allow(clippy::range_plus_one)]
pub async fn run(&self) -> Result<(), AcceptXmrError> {
{
let scanner_handle = self.scanner_handle.lock().await;
if let Some(handle) = scanner_handle.as_ref() {
if !handle.is_finished() {
return Err(AcceptXmrError::AlreadyRunning);
}
};
}
let monerod_client = self.monerod_client.clone();
let viewpair = self.viewpair;
let scan_interval = self.scan_interval;
let major_index = self.major_index;
let highest_minor_index = self.highest_minor_index.clone();
let block_cache_height = self.block_cache_height.clone();
let cached_daemon_height = self.cached_daemon_height.clone();
let initial_height = self.initial_height;
let publisher = self.publisher.clone();
let store = self.store.clone();
let command_receiver = self.scanner_command_sender.1.clone();
debug!("Creating blockchain scanner");
let mut scanner: Scanner<S, M> = Scanner::new(
monerod_client,
store,
DEFAULT_BLOCK_CACHE_SIZE,
block_cache_height,
cached_daemon_height,
initial_height,
publisher,
)
.await?;
info!("Starting blockchain scanner");
*self.scanner_handle.lock().await = Some(ScannerHandle::from(tokio::spawn(async move {
let mut sub_key_checker = SubKeyChecker::new(
&viewpair,
major_index..major_index.saturating_add(1),
0..highest_minor_index
.load(atomic::Ordering::Relaxed)
.saturating_add(1),
);
let mut blockscan_interval = time::interval(scan_interval);
loop {
match command_receiver
.lock()
.unwrap_or_else(PoisonError::into_inner)
.try_recv()
{
Ok(MessageToScanner::Stop) => {
info!("Scanner received stop signal. Stopping scanning thread");
break;
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => {
error!(
"Scanner lost connection to payment gateway. Stopping scanning thread."
);
break;
}
}
if sub_key_checker.table.len()
<= highest_minor_index.load(atomic::Ordering::Relaxed) as usize
{
sub_key_checker = SubKeyChecker::new(
&viewpair,
major_index..major_index.saturating_add(1),
0..highest_minor_index
.load(atomic::Ordering::Relaxed)
.saturating_add(1),
);
}
if let Err(e) = if scanner.is_synchronized().await {
trace!("Waiting for scan interval.");
let (_, result) =
join!(blockscan_interval.tick(), scanner.scan(&sub_key_checker));
result
} else {
trace!(
"Scanning at max speed to catch up. Cache height: {}, daemon height: {}",
scanner.cache_height().await,
scanner.daemon_height().await
);
scanner.scan(&sub_key_checker).await
} {
error!(
"Payment gateway encountered an error while scanning for payments: {}",
e
);
};
}
Ok(())
})));
debug!("Scanner started successfully");
Ok(())
}
#[must_use]
pub async fn status(&self) -> PaymentGatewayStatus {
let scanner_handle = self.scanner_handle.lock().await;
match scanner_handle.as_ref() {
None => PaymentGatewayStatus::NotRunning,
Some(handle) if handle.is_finished() => {
let owned_handle = self.scanner_handle.lock().await.take();
if let Some(handle) = owned_handle {
match handle.join().await {
Ok(()) => PaymentGatewayStatus::NotRunning,
Err(e) => PaymentGatewayStatus::Error(AcceptXmrError::Scanner(e)),
}
} else {
PaymentGatewayStatus::NotRunning
}
}
Some(_) => PaymentGatewayStatus::Running,
}
}
pub async fn stop(&self) -> Result<(), AcceptXmrError> {
match self.scanner_handle.lock().await.take() {
None => Ok(()),
Some(thread) if thread.is_finished() => match thread.join().await {
Ok(()) => Ok(()),
Err(e) => Err(AcceptXmrError::Scanner(e)),
},
Some(thread) => {
self.scanner_command_sender
.0
.lock()
.unwrap_or_else(PoisonError::into_inner)
.send(MessageToScanner::Stop)
.map_err(|e| AcceptXmrError::StopSignal(e.to_string()))?;
match thread.join().await {
Ok(()) => Ok(()),
Err(e) => Err(AcceptXmrError::Scanner(e)),
}
}
}
}
pub async fn new_invoice(
&self,
piconeros: u64,
confirmations_required: u64,
expiration_in: u64,
description: String,
) -> Result<InvoiceId, AcceptXmrError> {
let amount = piconeros;
let (sub_index, subaddress) = self
.subaddresses
.lock()
.unwrap_or_else(PoisonError::into_inner)
.remove_random();
let cached_daemon_height = self.cached_daemon_height.load(atomic::Ordering::Relaxed);
let creation_height = if cached_daemon_height != 0 {
cached_daemon_height
} else {
self.daemon_height().await?
};
let invoice = Invoice::new(
subaddress,
sub_index,
creation_height,
amount,
confirmations_required,
expiration_in,
description,
);
self.store.insert_invoice(invoice.clone()).await?;
debug!(
"Now tracking invoice to subaddress index {}",
invoice.index()
);
self.publisher.insert_invoice(invoice.id());
Ok(invoice.id())
}
pub async fn remove_invoice(
&self,
invoice_id: InvoiceId,
) -> Result<Option<Invoice>, AcceptXmrError> {
match self.store.remove_invoice(invoice_id).await? {
Some(old) => {
if !(old.is_expired()
|| old.is_confirmed() && old.creation_height() < old.current_height())
{
warn!("Removed an invoice which was neither expired, nor fully confirmed and a block or more old. Was this intentional?");
}
self.subaddresses
.lock()
.unwrap_or_else(PoisonError::into_inner)
.insert(invoice_id.sub_index, old.address().to_string());
self.publisher.remove_invoice(invoice_id);
Ok(Some(old))
}
None => Ok(None),
}
}
#[must_use]
pub fn subscribe(&self, invoice_id: InvoiceId) -> Option<Subscriber> {
self.publisher.subscribe(invoice_id)
}
#[must_use]
pub fn subscribe_all(&self) -> Subscriber {
self.publisher.subscribe_all()
}
pub async fn daemon_height(&self) -> Result<u64, AcceptXmrError> {
Ok(self.monerod_client.daemon_height().await?)
}
#[must_use]
#[doc(hidden)]
pub fn cache_height(&self) -> u64 {
use std::sync::atomic::Ordering;
self.block_cache_height.load(Ordering::Relaxed)
}
pub async fn get_invoice(
&self,
invoice_id: InvoiceId,
) -> Result<Option<Invoice>, AcceptXmrError> {
Ok(self.store.get_invoice(invoice_id).await?)
}
pub async fn get_invoice_ids(&self) -> Result<Vec<InvoiceId>, AcceptXmrError> {
Ok(self.store.get_invoice_ids().await?)
}
#[must_use]
pub fn daemon_url(&self) -> String {
self.monerod_client.url()
}
}
pub struct PaymentGatewayBuilder<S> {
daemon_url: String,
daemon_username: Option<String>,
daemon_password: Option<String>,
rpc_timeout: Duration,
rpc_connection_timeout: Duration,
private_view_key: String,
primary_address: String,
scan_interval: Duration,
store: S,
major_index: u32,
initial_height: Option<u64>,
seed: Option<u64>,
}
impl<S: Storage + 'static> PaymentGatewayBuilder<S> {
#[must_use]
pub fn new(
private_view_key: String,
primary_address: String,
store: S,
) -> PaymentGatewayBuilder<S> {
PaymentGatewayBuilder {
daemon_url: DEFAULT_DAEMON.to_string(),
daemon_username: None,
daemon_password: None,
rpc_timeout: DEFAULT_RPC_TOTAL_TIMEOUT,
rpc_connection_timeout: DEFAULT_RPC_CONNECTION_TIMEOUT,
private_view_key,
primary_address,
scan_interval: DEFAULT_SCAN_INTERVAL,
store,
major_index: 0,
initial_height: None,
seed: None,
}
}
#[must_use]
pub fn daemon_url(mut self, url: String) -> PaymentGatewayBuilder<S> {
self.daemon_url = url;
self
}
#[must_use]
pub fn daemon_login(mut self, username: String, password: String) -> PaymentGatewayBuilder<S> {
self.daemon_username = Some(username);
self.daemon_password = Some(password);
self
}
#[must_use]
pub fn rpc_timeout(mut self, timeout: Duration) -> PaymentGatewayBuilder<S> {
self.rpc_timeout = timeout;
self
}
#[must_use]
pub fn rpc_connection_timeout(mut self, timeout: Duration) -> PaymentGatewayBuilder<S> {
self.rpc_connection_timeout = timeout;
self
}
#[must_use]
pub fn scan_interval(mut self, interval: Duration) -> PaymentGatewayBuilder<S> {
self.scan_interval = interval;
self
}
#[must_use]
pub fn seed(mut self, seed: u64) -> PaymentGatewayBuilder<S> {
warn!("Seed set to {}. Some operations intended to be random (like the order in which subaddresses are used) will be predictable.", seed);
self.seed = Some(seed);
self
}
#[must_use]
pub fn account_index(mut self, index: u32) -> PaymentGatewayBuilder<S> {
self.major_index = index;
self
}
#[must_use]
pub fn initial_height(mut self, height: u64) -> PaymentGatewayBuilder<S> {
self.initial_height = Some(height);
self
}
pub async fn build(self) -> Result<PaymentGateway<S>, AcceptXmrError> {
let monerod_client = MonerodRpcClient::new(
self.daemon_url
.parse::<Uri>()
.map_err(|e| AcceptXmrError::Parse {
datatype: "Uri",
input: self.daemon_url.clone(),
error: e.to_string(),
})?,
self.rpc_timeout,
self.rpc_connection_timeout,
self.daemon_username.clone(),
self.daemon_password.clone(),
self.seed,
);
self.build_inner(monerod_client).await
}
pub async fn build_with_mock_daemon(
self,
) -> Result<PaymentGateway<S, MonerodMockClient>, AcceptXmrError> {
let monerod_client = MonerodMockClient::new();
self.build_inner(monerod_client).await
}
async fn build_inner<M: MonerodClient>(
self,
monerod_client: M,
) -> Result<PaymentGateway<S, M>, AcceptXmrError> {
let store = StorageClient::new(self.store);
let viewpair = monero::ViewPair {
view: monero::PrivateKey::from_str(&self.private_view_key).map_err(|e| {
AcceptXmrError::Parse {
datatype: "PrivateKey",
input: "[REDACTED]".to_string(),
error: e.to_string(),
}
})?,
spend: monero::Address::from_str(&self.primary_address)
.map_err(|e| AcceptXmrError::Parse {
datatype: "Address",
input: self.primary_address.to_string(),
error: e.to_string(),
})?
.public_spend,
};
let highest_minor_index = Arc::new(AtomicU32::new(0));
let subaddresses = SubaddressCache::init(
&store,
viewpair,
self.major_index,
highest_minor_index.clone(),
self.seed,
)
.await?;
debug!("Generated {} initial subaddresses", subaddresses.len());
let (scanner_cmd_tx, scanner_cmd_rx) = channel();
let scanner_command_sender = (
Mutex::new(scanner_cmd_tx),
Arc::new(Mutex::new(scanner_cmd_rx)),
);
Ok(PaymentGateway(Arc::new(PaymentGatewayInner {
monerod_client,
viewpair,
scan_interval: self.scan_interval,
store,
subaddresses: Mutex::new(subaddresses),
major_index: self.major_index,
highest_minor_index,
initial_height: self.initial_height,
block_cache_height: Arc::new(atomic::AtomicU64::new(0)),
cached_daemon_height: Arc::new(atomic::AtomicU64::new(0)),
scanner_handle: AsyncMutex::new(None),
scanner_command_sender,
publisher: Arc::new(Publisher::new()),
})))
}
}
#[derive(Debug)]
pub enum PaymentGatewayStatus {
Running,
NotRunning,
Error(AcceptXmrError),
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
pub(crate) enum MessageToScanner {
Stop,
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use testing_utils::{init_logger, PRIMARY_ADDRESS, PRIVATE_VIEW_KEY};
use crate::{storage::stores::InMemory, MonerodClient, PaymentGateway, PaymentGatewayBuilder};
#[tokio::test]
async fn daemon_url() {
init_logger();
let store = InMemory::new();
let payment_gateway: PaymentGateway<InMemory> = PaymentGatewayBuilder::<InMemory>::new(
PRIVATE_VIEW_KEY.to_string(),
PRIMARY_ADDRESS.to_string(),
store,
)
.daemon_url("http://example.com:18081".to_string())
.build()
.await
.unwrap();
assert_eq!(
payment_gateway.monerod_client.url(),
"http://example.com:18081/"
);
}
}