snap_coin/node/
mempool.rs1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use tokio::{sync::RwLock, time::sleep};
4
5use crate::{
6 core::transaction::{Transaction, TransactionId},
7 economics::EXPIRATION_TIME,
8};
9
10pub struct MemPool {
11 pending: Arc<RwLock<HashMap<u64, Vec<Transaction>>>>,
13}
14
15impl MemPool {
16 pub fn new() -> Self {
17 MemPool {
18 pending: Arc::new(RwLock::new(HashMap::new())),
19 }
20 }
21
22 pub fn start_expiry_watchdog(&mut self) {
23 let pending = self.pending.clone();
24 tokio::spawn(async move {
25 loop {
26 sleep(Duration::from_secs_f64(0.5)).await;
27 pending
28 .write()
29 .await
30 .remove(&(chrono::Utc::now().timestamp() as u64));
31 }
32 });
33 }
34
35 pub async fn get_mempool(&self) -> Vec<Transaction> {
37 self.pending
38 .read()
39 .await
40 .values()
41 .flat_map(|v| v.iter().map(|tx| tx.clone()))
42 .collect()
43 }
44
45 pub async fn add_transaction(&mut self, transaction: Transaction) {
48 let expiry = chrono::Utc::now().timestamp() as u64 + EXPIRATION_TIME;
49 if self.pending.read().await.contains_key(&expiry) {
50 self.pending
51 .write()
52 .await
53 .get_mut(&expiry)
54 .unwrap()
55 .push(transaction);
56 } else {
57 self.pending.write().await.insert(expiry, vec![transaction]);
58 }
59 }
60
61 pub async fn validate_transaction(&self, transaction: &Transaction) -> bool {
63 let mempool = self.get_mempool().await;
64 for mempool_transaction in mempool {
65 if transaction.inputs.iter().any(|i| {
66 mempool_transaction.inputs.iter().any(|mi| {
67 mi.output_index == i.output_index && mi.transaction_id == i.transaction_id
68 })
69 }) {
70 return false;
71 }
72 }
73 true
74 }
75
76 pub async fn spend_transactions(&self, transactions: Vec<TransactionId>) {
77 let mut pending = self.pending.write().await;
78
79 for txs in pending.values_mut() {
80 txs.retain(|mempool_tx| {
81 if let Some(id) = mempool_tx.transaction_id {
82 !transactions.contains(&id)
83 } else {
84 true
85 }
86 });
87 }
88
89 pending.retain(|_, txs| !txs.is_empty());
91 }
92}