use chrono::prelude::Utc;
use rand::{thread_rng, Rng};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use crate::common::adapters::DandelionAdapter;
use crate::core::core::hash::Hashed;
use crate::core::core::transaction;
use crate::pool::{BlockChain, DandelionConfig, Pool, PoolEntry, PoolError, TxSource};
use crate::util::StopState;
use crate::ServerTxPool;
pub fn monitor_transactions(
dandelion_config: DandelionConfig,
tx_pool: ServerTxPool,
adapter: Arc<dyn DandelionAdapter>,
stop_state: Arc<StopState>,
) -> std::io::Result<thread::JoinHandle<()>> {
debug!("Started Dandelion transaction monitor.");
thread::Builder::new()
.name("dandelion".to_string())
.spawn(move || {
let run_interval = Duration::from_secs(10);
let mut last_run = Instant::now()
.checked_sub(Duration::from_secs(20))
.unwrap_or_else(Instant::now);
loop {
if stop_state.is_stopped() {
break;
}
if last_run.elapsed() > run_interval {
if !adapter.is_stem() {
let _ = process_fluff_phase(&dandelion_config, &tx_pool, &adapter).map_err(
|e| {
error!("dand_mon: Problem processing fluff phase. {:?}", e);
},
);
}
let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| {
error!("dand_mon: Problem processing expired entries. {:?}", e);
});
if adapter.is_expired() {
adapter.next_epoch();
}
last_run = Instant::now();
}
thread::sleep(Duration::from_secs(1));
}
})
}
fn select_txs_cutoff<B>(pool: &Pool<B>, cutoff_secs: u16) -> Vec<PoolEntry>
where
B: BlockChain,
{
let cutoff = Utc::now().timestamp() - cutoff_secs as i64;
pool.entries
.iter()
.filter(|x| x.tx_at.timestamp() < cutoff)
.cloned()
.collect()
}
fn process_fluff_phase(
dandelion_config: &DandelionConfig,
tx_pool: &ServerTxPool,
adapter: &Arc<dyn DandelionAdapter>,
) -> Result<(), PoolError> {
let mut tx_pool = tx_pool.write();
let all_entries = tx_pool.stempool.entries.clone();
if all_entries.is_empty() {
return Ok(());
}
let cutoff_secs = dandelion_config.aggregation_secs;
let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs);
if !adapter.is_expired() && cutoff_entries.is_empty() {
return Ok(());
}
let header = tx_pool.chain_head()?;
let fluffable_txs = {
let txpool_tx = tx_pool.txpool.all_transactions_aggregate(None)?;
let txs: Vec<_> = all_entries.into_iter().map(|x| x.tx).collect();
tx_pool.stempool.validate_raw_txs(
&txs,
txpool_tx,
&header,
transaction::Weighting::NoLimit,
)?
};
debug!(
"dand_mon: Found {} txs in local stempool to fluff",
fluffable_txs.len()
);
let agg_tx = transaction::aggregate(&fluffable_txs)?;
agg_tx.validate(transaction::Weighting::AsTransaction)?;
tx_pool.add_to_pool(TxSource::Fluff, agg_tx, false, &header)?;
Ok(())
}
fn process_expired_entries(
dandelion_config: &DandelionConfig,
tx_pool: &ServerTxPool,
) -> Result<(), PoolError> {
let mut tx_pool = tx_pool.write();
let embargo_secs = dandelion_config.embargo_secs + thread_rng().gen_range(0, 31);
let expired_entries = select_txs_cutoff(&tx_pool.stempool, embargo_secs);
if expired_entries.is_empty() {
return Ok(());
}
debug!("dand_mon: Found {} expired txs.", expired_entries.len());
let header = tx_pool.chain_head()?;
for entry in expired_entries {
let txhash = entry.tx.hash();
match tx_pool.add_to_pool(TxSource::EmbargoExpired, entry.tx, false, &header) {
Ok(_) => info!(
"dand_mon: embargo expired for {}, fluffed successfully.",
txhash
),
Err(e) => warn!("dand_mon: failed to fluff expired tx {}, {:?}", txhash, e),
};
}
Ok(())
}