1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
pub(self) mod connection_handler;
pub(self) mod message_handler;
pub mod miner_instance;
pub use miner_instance::*;
pub mod server;
pub use server::*;
use crate::{
message::types::{Block, Transaction},
Context,
};
use snarkos_consensus::{
memory_pool::{Entry, MemoryPool},
ConsensusParameters,
MerkleTreeLedger,
};
use snarkos_dpc::base_dpc::{
instantiated::{Components, Tx},
parameters::PublicParameters,
};
use snarkos_errors::network::SendError;
use snarkos_utilities::bytes::FromBytes;
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::Mutex;
pub async fn process_transaction_internal(
context: Arc<Context>,
consensus: &ConsensusParameters,
parameters: &PublicParameters<Components>,
storage: Arc<MerkleTreeLedger>,
memory_pool_lock: Arc<Mutex<MemoryPool<Tx>>>,
transaction_bytes: Vec<u8>,
transaction_sender: SocketAddr,
) -> Result<(), SendError> {
if let Ok(transaction) = Tx::read(&transaction_bytes[..]) {
let mut memory_pool = memory_pool_lock.lock().await;
if !consensus.verify_transaction(parameters, &transaction, &storage)? {
error!("Received transaction was invalid");
return Ok(());
}
if transaction.value_balance.is_negative() {
error!("Received transaction was a coinbase transaction");
return Ok(());
}
let entry = Entry::<Tx> {
size: transaction_bytes.len(),
transaction,
};
if let Ok(inserted) = memory_pool.insert(&storage, entry) {
if inserted.is_some() {
info!("Transaction added to mempool. Propagating transaction to peers");
for (socket, _) in &context.peer_book.read().await.get_connected() {
if *socket != transaction_sender && *socket != *context.local_address.read().await {
if let Some(channel) = context.connections.read().await.get(socket) {
channel.write(&Transaction::new(transaction_bytes.clone())).await?;
}
}
}
}
}
}
Ok(())
}
pub async fn propagate_block(context: Arc<Context>, data: Vec<u8>, block_miner: SocketAddr) -> Result<(), SendError> {
info!("Propagating block to peers");
for (socket, _) in &context.peer_book.read().await.get_connected() {
if *socket != block_miner && *socket != *context.local_address.read().await {
if let Some(channel) = context.connections.read().await.get(socket) {
channel.write(&Block::new(data.clone())).await?;
}
}
}
Ok(())
}