1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, RwLock};
use ed25519_compact::Signature;
use crate::balance_cache::BalanceCache;
use crate::constants::{MIN_AMOUNT_CRUZBITS, MIN_FEE_CRUZBITS};
use crate::ledger_disk::LedgerDisk;
use crate::processor::Processor;
use crate::transaction::{Transaction, TransactionID};
use crate::transaction_queue::{TransactionQueue, TransactionQueueError};
/// An in-memory FIFO implementation of the TransactionQueue interface.
pub struct TransactionQueueMemory {
tx_map: RwLock<HashMap<TransactionID, Transaction>>,
tx_queue: RwLock<VecDeque<TransactionID>>,
balance_cache: RwLock<BalanceCache>,
}
impl TransactionQueueMemory {
/// Returns a new TransactionQueueMemory instance.
pub fn new(ledger: Arc<LedgerDisk>) -> Arc<Self> {
// don't accept transactions that would leave an unspendable balance with this node
let min_balance = MIN_AMOUNT_CRUZBITS + MIN_FEE_CRUZBITS;
Arc::new(Self {
tx_map: RwLock::new(HashMap::new()),
tx_queue: RwLock::new(VecDeque::new()),
balance_cache: RwLock::new(BalanceCache::new(ledger, min_balance)),
})
}
/// Rebuild the balance cache and remove transactions now in violation
fn reprocess_queue(&self, height: u64) -> Result<(), TransactionQueueError> {
// invalidate the cache
let mut balance_cache = self.balance_cache.write().unwrap();
balance_cache.reset();
// remove invalidated transactions from the queue
let mut tx_ids_to_remove = Vec::new();
let mut tx_queue = self.tx_queue.write().unwrap();
let mut tx_map = self.tx_map.write().unwrap();
for tx_id in tx_queue.iter() {
let tx = tx_map.get(tx_id).expect("tx queue to contain id");
// check that the series would still be valid
if !Processor::check_transaction_series(tx, height + 1) ||
// check maturity and expiration if included in the next block
!tx.is_mature(height + 1) || tx.is_expired(height + 1) ||
// don't re-mine any now unconfirmed spam
tx.fee.unwrap_or(0) < MIN_FEE_CRUZBITS || tx.amount < MIN_AMOUNT_CRUZBITS
{
// transaction has been invalidated. add it for removal and continue
tx_ids_to_remove.push(*tx_id);
continue;
}
// check balance
let ok = balance_cache.apply(tx)?;
if !ok {
// transaction has been invalidated. add it for removal and continue
tx_ids_to_remove.push(*tx_id);
continue;
}
}
// only retain elements that haven't been selected for removal
tx_map.retain(|tx_id, _tx| !tx_ids_to_remove.contains(tx_id));
tx_queue.retain(|tx_id| !tx_ids_to_remove.contains(tx_id));
Ok(())
}
}
impl TransactionQueue for TransactionQueueMemory {
/// Adds the transaction to the queue. Returns true if the transaction was added to the queue on this call.
fn add(&self, id: &TransactionID, tx: &Transaction) -> Result<bool, TransactionQueueError> {
let mut tx_map = self.tx_map.write().unwrap();
if tx_map.contains_key(id) {
return Ok(false);
}
// check sender balance and update sender and receiver balances
let mut balance_cache = self.balance_cache.write().unwrap();
if !balance_cache.apply(tx)? {
// insufficient sender balance
return Err(TransactionQueueError::SenderBalanceInsufficient(
*id,
tx.from.expect("transaction should have a sender"),
));
}
// add to the back of the queue
let mut tx_queue = self.tx_queue.write().unwrap();
tx_queue.push_back(*id);
tx_map.insert(*id, tx.clone());
Ok(true)
}
/// Returns the queue length.
fn len(&self) -> usize {
self.tx_queue.read().unwrap().len()
}
/// Returns true if the queue has a length of 0.
fn is_empty(&self) -> bool {
self.len() == 0
}
/// Returns transactions in the queue for the miner.
fn get(&self, limit: usize) -> Vec<Transaction> {
let tx_queue = self.tx_queue.read().unwrap();
let tx_map = self.tx_map.read().unwrap();
tx_queue
.iter()
.take(limit)
.filter_map(|tx_id| tx_map.get(tx_id).cloned())
.collect()
}
/// Adds a batch of transactions to the queue (a block has been disconnected.)
fn add_batch(&self, ids: &[TransactionID], txs: &[Transaction]) {
let mut tx_queue = self.tx_queue.write().unwrap();
let mut tx_map = self.tx_map.write().unwrap();
// add to front in reverse order.
// we want formerly confirmed transactions to have the highest
// priority for getting into the next block.
for i in (0..txs.len()).rev() {
let tx = &txs[i];
let tx_id = ids[i];
if tx_map.contains_key(&tx_id) {
if let Some(index) = tx_queue
.iter()
.position(|queue_tx_id| *queue_tx_id == tx_id)
{
// remove it from its current position
tx_queue.remove(index);
}
}
tx_queue.push_front(tx_id);
tx_map.insert(tx_id, tx.clone());
}
// we don't want to invalidate anything based on maturity/expiration/balance yet.
// if we're disconnecting a block we're going to be connecting some shortly.
}
/// Removes a batch of transactions from the queue (a block has been connected.)
/// "height" is the block chain height after this connection.
/// "more" indicates if more connections are coming.
fn remove_batch(
&self,
ids: &[TransactionID],
height: u64,
more: bool,
) -> Result<(), TransactionQueueError> {
// create a scope for the guards
{
let mut tx_map = self.tx_map.write().unwrap();
// remove the transactions from the queue
let mut tx_queue = self.tx_queue.write().unwrap();
tx_queue.retain(|tx_id| !ids.contains(tx_id));
tx_map.retain(|tx_id, _tx| !ids.contains(tx_id));
}
if more {
// we don't want to invalidate anything based on series/maturity/expiration/balance
// until we're done connecting all of the blocks we intend to
Ok(())
} else {
self.reprocess_queue(height)
}
}
/// Returns true if the given transaction is in the queue.
fn exists(&self, id: &TransactionID) -> bool {
self.tx_map.read().unwrap().contains_key(id)
}
/// Return true if the given transaction is in the queue and contains the given signature.
fn exists_signed(&self, id: &TransactionID, signature: Signature) -> bool {
if let Some(tx) = self.tx_map.read().unwrap().get(id) {
tx.signature.expect("transaction should have a signature") == signature
} else {
false
}
}
}