twelvepool 0.5.0

Watch for new txs in a Terra node mempool
Documentation
use std::collections::HashMap;
use std::vec;

use futures::{stream, StreamExt};
use reqwest::Client;
use tokio::sync::mpsc;
use tokio::time;

use crate::cache::Cache;
use crate::mempoolitem::MempoolItem;
use crate::terra::Terra;
use crate::tx::Tx;

#[derive(Debug)]
pub struct Watcher {
    terra: Terra,
    new_txs: Vec<Tx>,
    cached_txs: HashMap<String, Tx>,
    cache: Cache,
    interval: time::Interval,
}

impl Watcher {
    pub fn new(
        rpc_url: String,
        lcd_url: String,
        http_client: Option<Client>,
        interval_duration: Option<time::Duration>,
    ) -> Watcher {
        Watcher {
            terra: Terra::new(rpc_url, lcd_url, http_client.unwrap_or_default()),
            new_txs: vec![],
            cached_txs: HashMap::new(),
            cache: Cache::new(30),
            interval: time::interval(
                interval_duration.unwrap_or_else(|| time::Duration::from_millis(100)),
            ),
        }
    }

    pub fn run(mut self) -> mpsc::UnboundedReceiver<MempoolItem> {
        let (sender, receiver) = mpsc::unbounded_channel();

        tokio::spawn(async move {
            loop {
                match self.terra.get_unconfirmed_txs().await {
                    Ok(tx_strings) => {
                        let mut raw_txs = self.get_tx_hashes(tx_strings).await;
                        raw_txs.retain(|tx_hash, _| {
                            if self.cache.get(tx_hash).is_none() {
                                true
                            } else {
                                log::debug!("tx {} already sent", tx_hash);
                                false
                            }
                        });

                        let txs = self.get_decoded_txs(raw_txs).await;
                        txs.iter().for_each(|(tx_hash, tx)| {
                            match sender.send(MempoolItem::new(tx.clone(), tx_hash.clone())) {
                                Ok(_) => {
                                    log::info!("new tx {}", tx_hash);
                                    self.cache.set(tx_hash.clone(), tx.clone());
                                }
                                Err(err) => {
                                    log::error!("couldn't send tx {}: {}", tx_hash, err)
                                }
                            }
                        });
                    }
                    Err(err) => log::error!("couldn't get unconfirmed txs: {}", err),
                }
                let cleaned = self.cache.clear_expired();
                log::debug!("cleaned {} tx from cache", cleaned);
                self.interval.tick().await;
            }
        });

        receiver
    }

    async fn get_tx_hashes(&self, tx_strings: Vec<String>) -> HashMap<String, String> {
        let mut raw_txs: HashMap<String, String> = HashMap::new();

        let items: Vec<Option<(String, String)>> = stream::iter(tx_strings)
            .map(|tx_string| async move {
                if let Ok(tx_hash) = self.terra.get_tx_hash(&tx_string).await {
                    Some((tx_hash, tx_string))
                } else {
                    None
                }
            })
            .buffered(usize::MAX)
            .collect()
            .await;

        items.into_iter().for_each(|item| {
            if let Some((tx_hash, tx_string)) = item {
                raw_txs.insert(tx_hash, tx_string);
            };
        });

        raw_txs
    }

    async fn get_decoded_txs(&self, raw_txs: HashMap<String, String>) -> HashMap<String, Tx> {
        let mut txs: HashMap<String, Tx> = HashMap::new();
        let items: Vec<Option<(String, Tx)>> = stream::iter(raw_txs)
            .map(|(tx_hash, tx_string)| async move {
                if let Ok(tx) = self.terra.decode_tx(&tx_string).await {
                    Some((tx_hash, tx))
                } else {
                    None
                }
            })
            .buffered(usize::MAX)
            .collect()
            .await;

        items.into_iter().for_each(|item| {
            if let Some((tx_hash, tx)) = item {
                txs.insert(tx_hash, tx);
            };
        });

        txs
    }
}