acceptxmr 0.11.1

Accept monero in your application.
Documentation
use std::{
    ops::Deref,
    str::FromStr,
    sync::atomic::{self, AtomicU32, AtomicU64},
    sync::{mpsc, Arc, Mutex, PoisonError},
    thread::{self, JoinHandle},
    time::Duration,
};

use hyper::Uri;
use log::{debug, error, info, warn};
use monero::cryptonote::onetime_key::SubKeyChecker;
use tokio::{join, runtime::Runtime, time};

use crate::{
    caching::SubaddressCache,
    invoices_db::InvoicesDb,
    rpc::RpcClient,
    scanner::Scanner,
    subscriber::Subscriber,
    {AcceptXmrError, Invoice, InvoiceId},
};

const DEFAULT_SCAN_INTERVAL: Duration = Duration::from_millis(1000);
const DEFAULT_DAEMON: &str = "http://node.moneroworld.com:18089";
/// Timeout for RPC connection formation.
const DEFAULT_RPC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
/// Timeout for total call completion.
const DEFAULT_RPC_TOTAL_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_DB_PATH: &str = "AcceptXMR_DB";
const DEFAULT_BLOCK_CACHE_SIZE: usize = 10;

/// The `PaymentGateway` allows you to track new [`Invoice`](Invoice)s, remove old `Invoice`s from
/// tracking, and subscribe to `Invoice`s that are already pending.
#[derive(Clone)]
pub struct PaymentGateway(pub(crate) Arc<PaymentGatewayInner>);

#[doc(hidden)]
pub struct PaymentGatewayInner {
    rpc_client: RpcClient,
    viewpair: monero::ViewPair,
    scan_interval: Duration,
    invoices_db: InvoicesDb,
    subaddresses: Mutex<SubaddressCache>,
    highest_minor_index: Arc<AtomicU32>,
    block_cache_height: Arc<AtomicU64>,
    cached_daemon_height: Arc<AtomicU64>,
    scanner_handle: Mutex<Option<JoinHandle<Result<(), AcceptXmrError>>>>,
    scanner_command_sender: (
        Mutex<mpsc::Sender<MessageToScanner>>,
        Arc<Mutex<mpsc::Receiver<MessageToScanner>>>,
    ),
}

impl Deref for PaymentGateway {
    type Target = PaymentGatewayInner;

    fn deref(&self) -> &PaymentGatewayInner {
        &self.0
    }
}

impl PaymentGateway {
    /// Returns a builder used to create a new payment gateway.
    #[must_use]
    pub fn builder(private_view_key: String, primary_address: String) -> PaymentGatewayBuilder {
        PaymentGatewayBuilder::new(private_view_key, primary_address)
    }

    /// Runs the payment gateway. This function spawns a new thread, which periodically scans new
    /// blocks and transactions from the configured daemon and updates pending [`Invoice`](Invoice)s
    /// in the database.
    ///
    /// # Errors
    ///
    /// * Returns an [`AcceptXmrError::InvoiceStorage`] error if there was an underlying issue with
    ///   the database.
    ///
    /// * Returns an [`AcceptXmrError::Rpc`] error if there was an issue getting necessary data from
    ///   the monero daemon while starting.
    ///
    /// * Returns an [`AcceptXmrError::AlreadyRunning`] error if the payment gateway is already
    ///   running.
    ///
    /// * Returns an [`AcceptXmrError::Threading`] error if there was an error creating the scanning
    ///   thread.
    #[allow(clippy::range_plus_one, clippy::missing_panics_doc)]
    pub async fn run(&self) -> Result<(), AcceptXmrError> {
        // Determine if the scanning thread is already running.
        {
            let scanner_handle = self
                .scanner_handle
                .lock()
                .unwrap_or_else(PoisonError::into_inner);
            if let Some(handle) = scanner_handle.as_ref() {
                if !handle.is_finished() {
                    return Err(AcceptXmrError::AlreadyRunning);
                }
            };
        }

        // Gather info needed by the scanner.
        let rpc_client = self.rpc_client.clone();
        let viewpair = self.viewpair;
        let scan_interval = self.scan_interval;
        let highest_minor_index = self.highest_minor_index.clone();
        let block_cache_height = self.block_cache_height.clone();
        let cached_daemon_height = self.cached_daemon_height.clone();
        let pending_invoices = self.invoices_db.clone();
        let command_receiver = self.scanner_command_sender.1.clone();

        // Create scanner.
        debug!("Creating blockchain scanner");
        let mut scanner = Scanner::new(
            rpc_client,
            pending_invoices,
            DEFAULT_BLOCK_CACHE_SIZE,
            block_cache_height,
            cached_daemon_height,
        )
        .await?;

        // Spawn the scanning thread.
        info!("Starting blockchain scanner");
        *self.scanner_handle.lock().unwrap_or_else(PoisonError::into_inner) = Some(thread::Builder::new()
            .name("Scanning Thread".to_string())
            .spawn(move || -> Result<(), AcceptXmrError> {
                // The thread needs a tokio runtime to process async functions.
                let tokio_runtime = Runtime::new()?;
                tokio_runtime.block_on(async move {
                    // Create persistent sub key checker for efficient tx output checking.
                    let mut sub_key_checker = SubKeyChecker::new(
                        &viewpair,
                        1..2,
                        0..highest_minor_index.load(atomic::Ordering::Relaxed) + 1,
                    );
                    // Scan for transactions once every scan_interval.
                    let mut blockscan_interval = time::interval(scan_interval);
                    loop {
                        // If we're received the stop signal, stop.
                        match command_receiver.lock().unwrap_or_else(PoisonError::into_inner).try_recv() {
                            Ok(MessageToScanner::Stop) => {
                                info!("Scanner received stop signal. Stopping gracefully");
                                break;
                            }
                            Err(mpsc::TryRecvError::Empty) => {
                            }
                            Err(mpsc::TryRecvError::Disconnected) => {
                                error!("Scanner lost connection to payment gateway. Stopping gracefully.");
                                break;
                            }
                        }
                        // Update sub key checker if necessary.
                        if sub_key_checker.table.len()
                            <= highest_minor_index.load(atomic::Ordering::Relaxed) as usize
                        {
                            sub_key_checker = SubKeyChecker::new(
                                &viewpair,
                                1..2,
                                0..highest_minor_index.load(atomic::Ordering::Relaxed) + 1,
                            );
                        }
                        // Scan!
                        if let (_, Err(e)) = join!(blockscan_interval.tick(), scanner.scan(&sub_key_checker)) {
                            error!("Payment gateway encountered an error while scanning for payments: {}", e);
                        };
                    }
                });
                Ok(())
            })?);
        debug!("Scanner started successfully");
        Ok(())
    }

    /// Returns the enum [`PaymentGatewayStatus`] describing whether the payment gateway is running,
    /// not running, or has experienced an error.
    #[must_use]
    pub fn status(&self) -> PaymentGatewayStatus {
        let scanner_handle = self
            .scanner_handle
            .lock()
            .unwrap_or_else(PoisonError::into_inner);
        match scanner_handle.as_ref() {
            None => PaymentGatewayStatus::NotRunning,
            Some(handle) if handle.is_finished() => {
                let owned_handle = self
                    .scanner_handle
                    .lock()
                    .unwrap_or_else(PoisonError::into_inner)
                    .take();
                match owned_handle.map(std::thread::JoinHandle::join) {
                    None | Some(Ok(Ok(_))) => PaymentGatewayStatus::NotRunning,
                    Some(Ok(Err(e))) => {
                        PaymentGatewayStatus::Error(AcceptXmrError::ScanningThread(Box::new(e)))
                    }
                    Some(Err(_)) => {
                        PaymentGatewayStatus::Error(AcceptXmrError::ScanningThreadPanic)
                    }
                }
            }
            Some(_) => PaymentGatewayStatus::Running,
        }
    }

    /// Stops the payment gateway, blocking until complete. If the payment gateway is not running,
    /// this method does nothing.
    ///
    /// # Errors
    ///
    /// * Returns an [`AcceptXmrError::StopSignal`] error if the payment gateway could not be stopped.
    ///
    /// * Returns an [`AcceptXmrError::ScanningThread`] error if the scanning thread exited with an error.
    ///
    /// * Returns an [`AcceptXmrError::ScanningThreadPanic`] error if the scanning thread exited with a panic.
    pub fn stop(&self) -> Result<(), AcceptXmrError> {
        match self
            .scanner_handle
            .lock()
            .unwrap_or_else(PoisonError::into_inner)
            .take()
        {
            None => Ok(()),
            Some(thread) if thread.is_finished() => match thread.join() {
                Ok(Ok(_)) => Ok(()),
                Ok(Err(e)) => Err(AcceptXmrError::ScanningThread(Box::new(e))),
                Err(_) => Err(AcceptXmrError::ScanningThreadPanic),
            },
            Some(thread) => {
                self.scanner_command_sender
                    .0
                    .lock()
                    .unwrap_or_else(PoisonError::into_inner)
                    .send(MessageToScanner::Stop)
                    .map_err(|e| AcceptXmrError::StopSignal(e.to_string()))?;
                match thread.join() {
                    Ok(Ok(_)) => Ok(()),
                    Ok(Err(e)) => Err(AcceptXmrError::ScanningThread(Box::new(e))),
                    Err(_) => Err(AcceptXmrError::ScanningThreadPanic),
                }
            }
        }
    }

    /// Adds a new [`Invoice`] to the payment gateway for tracking, and returns the ID of the new
    /// invoice. Use a [`Subscriber`] to receive updates on the new invoice invoice as they occur.
    ///
    /// # Errors
    ///
    /// Returns an error if there are any underlying issues modifying data in the
    /// database.
    pub async fn new_invoice(
        &self,
        piconeros: u64,
        confirmations_required: u64,
        expiration_in: u64,
        description: String,
    ) -> Result<InvoiceId, AcceptXmrError> {
        let amount = piconeros;

        // Get subaddress in base58, and subaddress index.
        let (sub_index, subaddress) = self
            .subaddresses
            .lock()
            .unwrap_or_else(PoisonError::into_inner)
            .remove_random();

        let creation_height = self.cached_daemon_height.load(atomic::Ordering::Relaxed);

        // Create invoice object.
        let invoice = Invoice::new(
            subaddress,
            sub_index,
            creation_height,
            amount,
            confirmations_required,
            expiration_in,
            description,
        );

        // Insert invoice into database for tracking.
        self.invoices_db.insert(&invoice)?;
        debug!(
            "Now tracking invoice to subaddress index {}",
            invoice.index()
        );

        // Return invoice id so the user can build identify their invoice, and make a subscriber for
        // it if desired.
        Ok(invoice.id())
    }

    /// Remove (i.e. stop tracking) invoice, returning the old invoice if it existed.
    ///
    /// # Errors
    ///
    /// Returns an error if there are any underlying issues modifying/retrieving data in the
    /// database.
    pub fn remove_invoice(&self, invoice_id: InvoiceId) -> Result<Option<Invoice>, AcceptXmrError> {
        match self.invoices_db.remove(invoice_id)? {
            Some(old) => {
                if !(old.is_expired()
                    || old.is_confirmed() && old.creation_height() < old.current_height())
                {
                    warn!("Removed an invoice which was neither expired, nor fully confirmed and a block or more old. Was this intentional?");
                }
                // Put the subaddress back in the subaddress cache.
                self.subaddresses
                    .lock()
                    .unwrap_or_else(PoisonError::into_inner)
                    .insert(invoice_id.sub_index, old.address().to_string());

                Ok(Some(old))
            }
            None => Ok(None),
        }
    }

    /// Returns a `Subscriber` for the given invoice ID. If a tracked invoice exists for that
    /// ID, the subscriber can be used to receive updates to for that invoice.
    ///
    /// # Errors
    ///
    /// Returns an error if there is an underlying issue retrieving data from the database.
    pub fn subscribe(&self, invoice_id: InvoiceId) -> Result<Option<Subscriber>, AcceptXmrError> {
        Ok(self.invoices_db.subscribe(invoice_id)?)
    }

    /// Returns a `Subscriber` for all invoices.
    #[must_use]
    pub fn subscribe_all(&self) -> Subscriber {
        self.invoices_db.subscribe_all()
    }

    /// Get current height of daemon using a monero daemon remote procedure call.
    ///
    /// # Errors
    ///
    /// Returns an error if a connection can not be made to the daemon, or if the daemon's response
    /// cannot be parsed.
    pub async fn daemon_height(&self) -> Result<u64, AcceptXmrError> {
        Ok(self.rpc_client.daemon_height().await?)
    }

    /// Get current height of block cache.
    #[must_use]
    #[doc(hidden)]
    pub fn cache_height(&self) -> u64 {
        use std::sync::atomic::Ordering;
        self.block_cache_height.load(Ordering::Relaxed)
    }

    /// Get the up-to-date invoice associated with the given [`InvoiceId`], if it exists.
    ///
    /// # Errors
    ///
    /// Returns an error if there are any underlying issues retrieving data from the database.
    pub fn get_invoice(&self, invoice_id: InvoiceId) -> Result<Option<Invoice>, AcceptXmrError> {
        Ok(self.invoices_db.get(invoice_id)?)
    }

    /// Returns URL of configured daemon.
    #[must_use]
    pub fn daemon_url(&self) -> String {
        self.rpc_client.url()
    }
}

/// A builder for the payment gateway. Used to configure your desired monero daemon, scan interval,
/// view key, etc.
///
/// # Examples
///
/// ```
/// # use tempfile::Builder;
/// use acceptxmr::PaymentGatewayBuilder;
/// use std::time::Duration;
///
/// # let temp_dir = Builder::new()
/// #   .prefix("temp_db_")
/// #   .rand_bytes(16)
/// #   .tempdir().expect("Failed to generate temporary directory");
///
/// let private_view_key = "ad2093a5705b9f33e6f0f0c1bc1f5f639c756cdfc168c8f2ac6127ccbdab3a03";
/// let primary_address = "4613YiHLM6JMH4zejMB2zJY5TwQCxL8p65ufw8kBP5yxX9itmuGLqp1dS4tkVoTxjyH3aYhYNrtGHbQzJQP5bFus3KHVdmf";
///
/// // Create a payment gateway with an extra fast scan rate and a custom monero daemon URL.
/// let payment_gateway = PaymentGatewayBuilder::new(private_view_key.to_string(), primary_address.to_string())
///     .scan_interval(Duration::from_millis(100)) // Scan for invoice updates every 100 ms.
///     .daemon_url("http://example.com:18081".to_string()) // Set custom monero daemon URL.
/// #   .db_path(temp_dir.path().to_str().expect("Failed to get temporary directory path").to_string())
///     .build();
/// ```
pub struct PaymentGatewayBuilder {
    daemon_url: String,
    daemon_username: Option<String>,
    daemon_password: Option<String>,
    rpc_timeout: Duration,
    rpc_connection_timeout: Duration,
    private_view_key: String,
    primary_address: String,
    scan_interval: Duration,
    db_path: String,
    seed: Option<u64>,
}

impl PaymentGatewayBuilder {
    /// Create a new payment gateway builder.
    #[must_use]
    pub fn new(private_view_key: String, primary_address: String) -> PaymentGatewayBuilder {
        PaymentGatewayBuilder {
            daemon_url: DEFAULT_DAEMON.to_string(),
            daemon_username: None,
            daemon_password: None,
            rpc_timeout: DEFAULT_RPC_TOTAL_TIMEOUT,
            rpc_connection_timeout: DEFAULT_RPC_CONNECTION_TIMEOUT,
            private_view_key,
            primary_address,
            scan_interval: DEFAULT_SCAN_INTERVAL,
            db_path: DEFAULT_DB_PATH.to_string(),
            seed: None,
        }
    }

    /// Set the url and port of your preferred monero daemon. Defaults to
    /// [http://node.moneroworld.com:18089](http://node.moneroworld.com:18089).
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// #
    /// use acceptxmr::PaymentGatewayBuilder;
    ///
    /// let private_view_key = "ad2093a5705b9f33e6f0f0c1bc1f5f639c756cdfc168c8f2ac6127ccbdab3a03";
    /// let primary_address = "4613YiHLM6JMH4zejMB2zJY5TwQCxL8p65ufw8kBP5yxX9itmuGLqp1dS4tkVoTxjyH3aYhYNrtGHbQzJQP5bFus3KHVdmf";
    ///
    /// // Create a payment gateway with a custom monero daemon URL.
    /// let payment_gateway = PaymentGatewayBuilder::new(private_view_key.to_string(), primary_address.to_string())
    ///     .daemon_url("http://example.com:18081".to_string()) // Set custom monero daemon URL.
    ///     .build()?;
    ///
    /// // The payment gateway will now use the daemon specified.
    /// payment_gateway.run().await?;
    /// #   Ok(())
    /// # }
    /// ```
    #[must_use]
    pub fn daemon_url(mut self, url: String) -> PaymentGatewayBuilder {
        self.daemon_url = url;
        self
    }

    /// If your preferred daemon requires a password, configure it here.
    #[must_use]
    pub fn daemon_login(mut self, username: String, password: String) -> PaymentGatewayBuilder {
        self.daemon_username = Some(username);
        self.daemon_password = Some(password);
        self
    }

    /// Time before an remote procedure call times out. If this amount of time elapses without
    /// receiving a full response from the RPC daemon, the current scan will be aborted and
    /// restarted. Defaults to 10 seconds.
    #[must_use]
    pub fn rpc_timeout(mut self, timeout: Duration) -> PaymentGatewayBuilder {
        self.rpc_timeout = timeout;
        self
    }

    /// Time before a remote procedure call times out while failing to connect. If this amount of
    /// time elapses without managing to connect to the monero daemon, the current scan will be
    /// aborted and restarted. Defaults to 5 seconds.
    #[must_use]
    pub fn rpc_connection_timeout(mut self, timeout: Duration) -> PaymentGatewayBuilder {
        self.rpc_connection_timeout = timeout;
        self
    }

    /// Set the minimum scan interval. New blocks and transactions will be scanned for relevant
    /// outputs at most every `interval`. Defaults to 1 second.
    #[must_use]
    pub fn scan_interval(mut self, interval: Duration) -> PaymentGatewayBuilder {
        self.scan_interval = interval;
        self
    }

    /// Path to the pending invoices database. Defaults to `AcceptXMR_DB/`.
    #[must_use]
    pub fn db_path(mut self, path: String) -> PaymentGatewayBuilder {
        self.db_path = path;
        self
    }

    /// Seed for random number generator. Use only for reproducible testing. Do not set in a
    /// production environment.
    #[must_use]
    pub fn seed(mut self, seed: u64) -> PaymentGatewayBuilder {
        warn!("Seed set to {}. Some operations intended to be random (like the order in which subaddresses are used) will be predictable.", seed);
        self.seed = Some(seed);
        self
    }

    /// Build the payment gateway.
    ///
    /// # Errors
    ///
    /// Returns an error if the database cannot be opened at the path specified, if the internal RPC
    /// client cannot parse the provided URL, or if the primary address or private view key cannot
    /// be parsed.
    pub fn build(self) -> Result<PaymentGateway, AcceptXmrError> {
        let rpc_client = RpcClient::new(
            self.daemon_url
                .parse::<Uri>()
                .map_err(|e| AcceptXmrError::Parse {
                    datatype: "Uri",
                    input: self.daemon_url,
                    error: e.to_string(),
                })?,
            self.rpc_timeout,
            self.rpc_connection_timeout,
            self.daemon_username,
            self.daemon_password,
            self.seed,
        );
        let invoices_db = InvoicesDb::new(&self.db_path)?;
        info!("Opened database in \"{}/\"", self.db_path);

        let viewpair = monero::ViewPair {
            view: monero::PrivateKey::from_str(&self.private_view_key).map_err(|e| {
                AcceptXmrError::Parse {
                    datatype: "PrivateKey",
                    input: self.private_view_key.to_string(),
                    error: e.to_string(),
                }
            })?,
            spend: monero::Address::from_str(&self.primary_address)
                .map_err(|e| AcceptXmrError::Parse {
                    datatype: "Address",
                    input: self.primary_address.to_string(),
                    error: e.to_string(),
                })?
                .public_spend,
        };

        let highest_minor_index = Arc::new(AtomicU32::new(0));
        let subaddresses = SubaddressCache::init(
            &invoices_db,
            viewpair,
            highest_minor_index.clone(),
            self.seed,
        )?;
        debug!("Generated {} initial subaddresses", subaddresses.len());

        let (scanner_tx, scanner_rx) = mpsc::channel();
        let scanner_command_sender = (Mutex::new(scanner_tx), Arc::new(Mutex::new(scanner_rx)));

        Ok(PaymentGateway(Arc::new(PaymentGatewayInner {
            rpc_client,
            viewpair,
            scan_interval: self.scan_interval,
            invoices_db,
            subaddresses: Mutex::new(subaddresses),
            highest_minor_index,
            block_cache_height: Arc::new(atomic::AtomicU64::new(0)),
            cached_daemon_height: Arc::new(atomic::AtomicU64::new(0)),
            scanner_handle: Mutex::new(None),
            scanner_command_sender,
        })))
    }
}

/// Enumeration of possible payment gateway states.
#[derive(Debug)]
pub enum PaymentGatewayStatus {
    /// The payment gateway is scanning for incoming payments.
    Running,
    /// The payment gateway is not scanning for incoming payments.
    NotRunning,
    /// The payment gateway encountered an error while scanning for incoming payments, and had to
    /// stop.
    Error(AcceptXmrError),
}

#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
pub(crate) enum MessageToScanner {
    Stop,
}

#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
    use std::env;

    use tempfile::{Builder, TempDir};

    use crate::PaymentGatewayBuilder;

    fn init_logger() {
        env::set_var(
            "RUST_LOG",
            "debug,mio=debug,want=debug,reqwest=info,sled=info,hyper=info,tracing=debug,httpmock=info,isahc=info",
        );
        let _ = env_logger::builder().is_test(true).try_init();
    }

    fn new_temp_dir() -> TempDir {
        Builder::new()
            .prefix("temp_db_")
            .rand_bytes(16)
            .tempdir()
            .expect("failed to generate temporary directory")
    }

    const PRIVATE_VIEW_KEY: &str =
        "ad2093a5705b9f33e6f0f0c1bc1f5f639c756cdfc168c8f2ac6127ccbdab3a03";
    const PRIMARY_ADDRESS: &str =
        "4613YiHLM6JMH4zejMB2zJY5TwQCxL8p65ufw8kBP5yxX9itmuGLqp1dS4tkVoTxjyH3aYhYNrtGHbQzJQP5bFus3KHVdmf";

    #[test]
    fn daemon_url() {
        // Setup.
        init_logger();
        let temp_dir = new_temp_dir();

        let payment_gateway =
            PaymentGatewayBuilder::new(PRIVATE_VIEW_KEY.to_string(), PRIMARY_ADDRESS.to_string())
                .db_path(
                    temp_dir
                        .path()
                        .to_str()
                        .expect("failed to get temporary directory path")
                        .to_string(),
                )
                .daemon_url("http://example.com:18081".to_string())
                .build()
                .expect("failed to build payment gateway");

        assert_eq!(
            payment_gateway.rpc_client.url(),
            "http://example.com:18081/"
        );
    }
}