use crate::{
block::Block,
blockchain::Blockchain,
burnfee::BurnFee,
consensus::SaitoMessage,
crypto::{SaitoHash, SaitoPrivateKey, SaitoPublicKey},
golden_ticket::GoldenTicket,
time::create_timestamp,
transaction::Transaction,
wallet::Wallet,
};
use log::info;
use std::{collections::HashMap, collections::VecDeque, sync::Arc, thread::sleep, time::Duration};
use tokio::sync::{broadcast, mpsc, RwLock};
#[derive(Clone, Debug)]
pub enum MempoolMessage {
LocalTryBundleBlock,
LocalNewBlock,
}
#[derive(Debug)]
pub struct Mempool {
blocks_queue: VecDeque<Block>,
pub transactions: Vec<Transaction>, routing_work_in_mempool: u64,
wallet_lock: Arc<RwLock<Wallet>>,
currently_bundling_block: bool,
broadcast_channel_sender: Option<broadcast::Sender<SaitoMessage>>,
mempool_publickey: SaitoPublicKey,
mempool_privatekey: SaitoPrivateKey,
}
impl Mempool {
#[allow(clippy::new_without_default)]
pub fn new(wallet_lock: Arc<RwLock<Wallet>>) -> Self {
Mempool {
blocks_queue: VecDeque::new(),
transactions: vec![],
routing_work_in_mempool: 0,
wallet_lock,
currently_bundling_block: false,
broadcast_channel_sender: None,
mempool_publickey: [0; 33],
mempool_privatekey: [0; 32],
}
}
pub fn add_block(&mut self, block: Block) {
let hash_to_insert = block.get_hash();
if self
.blocks_queue
.iter()
.any(|block| block.get_hash() == hash_to_insert)
{
} else {
self.blocks_queue.push_back(block);
}
}
pub async fn add_golden_ticket(&mut self, golden_ticket: GoldenTicket) {
let mut wallet = self.wallet_lock.write().await;
let transaction = wallet.create_golden_ticket_transaction(golden_ticket).await;
if self
.transactions
.iter()
.any(|transaction| transaction.is_golden_ticket())
{
} else {
info!("adding golden ticket to mempool...");
self.transactions.push(transaction);
}
}
pub async fn add_transaction_if_validates(
&mut self,
transaction: Transaction,
blockchain_lock: Arc<RwLock<Blockchain>>,
) {
let blockchain = blockchain_lock.read().await;
if transaction.validate(&blockchain.utxoset, &blockchain.staking) {
self.add_transaction(transaction).await;
}
}
pub async fn add_transaction(&mut self, mut transaction: Transaction) {
info!("add_transaction {:?}", transaction.get_transaction_type());
let tx_sig_to_insert = transaction.get_signature();
let publickey;
{
let wallet = self.wallet_lock.read().await;
publickey = wallet.get_publickey();
}
transaction.generate_metadata(publickey);
let routing_work_available_for_me =
transaction.get_routing_work_for_publickey(self.mempool_publickey);
if self
.transactions
.iter()
.any(|transaction| transaction.get_signature() == tx_sig_to_insert)
{
} else {
self.transactions.push(transaction);
self.routing_work_in_mempool += routing_work_available_for_me;
}
}
pub async fn bundle_block(
&mut self,
blockchain_lock: Arc<RwLock<Blockchain>>,
current_timestamp: u64,
) -> Block {
let blockchain = blockchain_lock.read().await;
let previous_block_hash = blockchain.get_latest_block_hash();
let mut block = Block::generate(
&mut self.transactions,
previous_block_hash,
self.wallet_lock.clone(),
blockchain_lock.clone(),
current_timestamp,
)
.await;
block.generate_metadata();
self.routing_work_in_mempool = 0;
block
}
pub async fn can_bundle_block(
&self,
blockchain_lock: Arc<RwLock<Blockchain>>,
current_timestamp: u64,
) -> bool {
if self.currently_bundling_block {
return false;
}
if self.transactions.is_empty() {
return false;
}
let blockchain = blockchain_lock.read().await;
if let Some(previous_block) = blockchain.get_latest_block() {
let work_available = self.get_routing_work_available();
let work_needed = self.get_routing_work_needed(previous_block, current_timestamp);
let time_elapsed = current_timestamp - previous_block.get_timestamp();
info!(
"can_bundle_block. work available: {:?} -- work needed: {:?} -- time elapsed: {:?} ",
work_available,
work_needed,
time_elapsed
);
work_available >= work_needed
} else {
true
}
}
pub fn delete_transactions(&mut self, transactions: &Vec<Transaction>) {
let mut tx_hashmap = HashMap::new();
for transaction in transactions {
let hash = transaction.get_hash_for_signature();
tx_hashmap.entry(hash).or_insert(true);
}
self.routing_work_in_mempool = 0;
self.transactions
.retain(|x| tx_hashmap.contains_key(&x.get_hash_for_signature()) != true);
for transaction in &self.transactions {
self.routing_work_in_mempool +=
transaction.get_routing_work_for_publickey(self.mempool_publickey);
}
}
pub fn get_routing_work_available(&self) -> u64 {
if self.routing_work_in_mempool > 0 {
return self.routing_work_in_mempool;
}
0
}
pub fn get_routing_work_needed(&self, previous_block: &Block, current_timestamp: u64) -> u64 {
let previous_block_timestamp = previous_block.get_timestamp();
let previous_block_burnfee = previous_block.get_burnfee();
let work_needed: u64 = BurnFee::return_routing_work_needed_to_produce_block_in_nolan(
previous_block_burnfee,
current_timestamp,
previous_block_timestamp,
);
work_needed
}
pub fn set_broadcast_channel_sender(&mut self, bcs: broadcast::Sender<SaitoMessage>) {
self.broadcast_channel_sender = Some(bcs);
}
pub fn set_mempool_publickey(&mut self, publickey: SaitoPublicKey) {
self.mempool_publickey = publickey;
}
pub fn set_mempool_privatekey(&mut self, privatekey: SaitoPrivateKey) {
self.mempool_privatekey = privatekey;
}
pub async fn send_blocks_to_blockchain(
mempool_lock: Arc<RwLock<Mempool>>,
blockchain_lock: Arc<RwLock<Blockchain>>,
) {
let mut mempool = mempool_lock.write().await;
mempool.currently_bundling_block = true;
let mut blockchain = blockchain_lock.write().await;
while let Some(block) = mempool.blocks_queue.pop_front() {
mempool.delete_transactions(&block.get_transactions());
blockchain.add_block(block).await;
}
mempool.currently_bundling_block = false;
}
pub fn transaction_exists(&self, tx_hash: Option<SaitoHash>) -> bool {
self.transactions
.iter()
.any(|transaction| transaction.get_hash_for_signature() == tx_hash)
}
}
pub async fn try_bundle_block(
mempool_lock: Arc<RwLock<Mempool>>,
blockchain_lock: Arc<RwLock<Blockchain>>,
current_timestamp: u64,
) -> Option<Block> {
info!("try_bundle_block");
let can_bundle;
{
let mempool = mempool_lock.read().await;
can_bundle = mempool
.can_bundle_block(blockchain_lock.clone(), current_timestamp)
.await;
}
if can_bundle {
let mut mempool = mempool_lock.write().await;
Some(
mempool
.bundle_block(blockchain_lock.clone(), current_timestamp)
.await,
)
} else {
None
}
}
pub async fn run(
mempool_lock: Arc<RwLock<Mempool>>,
blockchain_lock: Arc<RwLock<Blockchain>>,
broadcast_channel_sender: broadcast::Sender<SaitoMessage>,
mut broadcast_channel_receiver: broadcast::Receiver<SaitoMessage>,
) -> crate::Result<()> {
{
let mut mempool = mempool_lock.write().await;
let publickey;
let privatekey;
{
let wallet = mempool.wallet_lock.read().await;
publickey = wallet.get_publickey();
privatekey = wallet.get_privatekey();
}
mempool.set_broadcast_channel_sender(broadcast_channel_sender.clone());
mempool.set_mempool_publickey(publickey);
mempool.set_mempool_privatekey(privatekey);
}
let (mempool_channel_sender, mut mempool_channel_receiver) = mpsc::channel(4);
let bundle_block_sender = mempool_channel_sender.clone();
tokio::spawn(async move {
loop {
bundle_block_sender
.send(MempoolMessage::LocalTryBundleBlock)
.await
.expect("error: LocalTryBundleBlock message failed to send");
sleep(Duration::from_millis(1000));
}
});
loop {
tokio::select! {
Some(message) = mempool_channel_receiver.recv() => {
match message {
MempoolMessage::LocalTryBundleBlock => {
let current_timestamp = create_timestamp();
if let Some(block) = try_bundle_block(
mempool_lock.clone(),
blockchain_lock.clone(),
current_timestamp,
).await {
let mut mempool = mempool_lock.write().await;
mempool.add_block(block);
mempool_channel_sender.send(MempoolMessage::LocalNewBlock).await.expect("Failed to send LocalNewBlock message");
}
},
MempoolMessage::LocalNewBlock => {
Mempool::send_blocks_to_blockchain(mempool_lock.clone(), blockchain_lock.clone()).await;
},
}
}
Ok(message) = broadcast_channel_receiver.recv() => {
match message {
SaitoMessage::MinerNewGoldenTicket { ticket : golden_ticket } => {
let mut mempool = mempool_lock.write().await;
mempool.add_golden_ticket(golden_ticket).await;
},
_ => {},
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{block::Block, test_utilities::test_manager::TestManager, wallet::Wallet};
use std::sync::Arc;
use tokio::sync::RwLock;
#[test]
fn mempool_new_test() {
let wallet = Wallet::new();
let mempool = Mempool::new(Arc::new(RwLock::new(wallet)));
assert_eq!(mempool.blocks_queue, VecDeque::new());
}
#[test]
fn mempool_add_block_test() {
let wallet = Wallet::new();
let mut mempool = Mempool::new(Arc::new(RwLock::new(wallet)));
let block = Block::new();
mempool.add_block(block.clone());
assert_eq!(Some(block), mempool.blocks_queue.pop_front())
}
#[tokio::test]
#[serial_test::serial]
async fn mempool_bundle_blocks_test() {
let wallet_lock = Arc::new(RwLock::new(Wallet::new()));
let blockchain_lock = Arc::new(RwLock::new(Blockchain::new(wallet_lock.clone())));
let mut test_manager = TestManager::new(blockchain_lock.clone(), wallet_lock.clone());
let mempool_lock = test_manager.mempool_lock.clone();
test_manager
.add_block(create_timestamp(), 3, 0, false, vec![])
.await;
{
let blockchain = blockchain_lock.read().await;
assert_eq!(blockchain.get_latest_block_id(), 1);
}
for _i in 0..4 {
let block = test_manager.generate_block_via_mempool().await;
{
let mut mempool = test_manager.mempool_lock.write().await;
mempool.add_block(block);
}
Mempool::send_blocks_to_blockchain(mempool_lock.clone(), blockchain_lock.clone()).await;
}
test_manager.check_blockchain().await;
}
}