guts_consensus/
mempool.rs1use crate::error::{ConsensusError, Result};
7use crate::transaction::{Transaction, TransactionId};
8use parking_lot::RwLock;
9use std::collections::{HashMap, VecDeque};
10use std::time::{Duration, Instant};
11
12#[derive(Debug, Clone)]
14pub struct MempoolConfig {
15 pub max_transactions: usize,
17
18 pub max_transaction_age: Duration,
20
21 pub max_transactions_per_block: usize,
23}
24
25impl Default for MempoolConfig {
26 fn default() -> Self {
27 Self {
28 max_transactions: 10_000,
29 max_transaction_age: Duration::from_secs(600), max_transactions_per_block: 1000,
31 }
32 }
33}
34
35#[derive(Debug, Clone)]
37struct PendingTransaction {
38 transaction: Transaction,
40
41 added_at: Instant,
43
44 propose_count: u32,
46}
47
48pub struct Mempool {
50 config: MempoolConfig,
52
53 transactions: RwLock<HashMap<TransactionId, PendingTransaction>>,
55
56 order: RwLock<VecDeque<TransactionId>>,
58}
59
60impl Mempool {
61 pub fn new(config: MempoolConfig) -> Self {
63 Self {
64 config,
65 transactions: RwLock::new(HashMap::new()),
66 order: RwLock::new(VecDeque::new()),
67 }
68 }
69
70 pub fn with_defaults() -> Self {
72 Self::new(MempoolConfig::default())
73 }
74
75 pub fn add(&self, transaction: Transaction) -> Result<TransactionId> {
77 let id = transaction.id();
78
79 let mut txs = self.transactions.write();
80 let mut order = self.order.write();
81
82 if txs.contains_key(&id) {
84 return Err(ConsensusError::DuplicateTransaction(id.to_hex()));
85 }
86
87 while txs.len() >= self.config.max_transactions {
89 if let Some(old_id) = order.pop_front() {
90 txs.remove(&old_id);
91 tracing::debug!(?old_id, "evicted transaction due to mempool capacity");
92 } else {
93 break;
94 }
95 }
96
97 let pending = PendingTransaction {
99 transaction,
100 added_at: Instant::now(),
101 propose_count: 0,
102 };
103
104 txs.insert(id, pending);
105 order.push_back(id);
106
107 tracing::trace!(?id, "added transaction to mempool");
108
109 Ok(id)
110 }
111
112 pub fn get(&self, id: &TransactionId) -> Option<Transaction> {
114 self.transactions
115 .read()
116 .get(id)
117 .map(|p| p.transaction.clone())
118 }
119
120 pub fn contains(&self, id: &TransactionId) -> bool {
122 self.transactions.read().contains_key(id)
123 }
124
125 pub fn len(&self) -> usize {
127 self.transactions.read().len()
128 }
129
130 pub fn is_empty(&self) -> bool {
132 self.transactions.read().is_empty()
133 }
134
135 pub fn remove(&self, id: &TransactionId) -> Option<Transaction> {
137 let mut txs = self.transactions.write();
138 let mut order = self.order.write();
139
140 if let Some(pending) = txs.remove(id) {
141 order.retain(|tx_id| tx_id != id);
142 Some(pending.transaction)
143 } else {
144 None
145 }
146 }
147
148 pub fn remove_batch(&self, ids: &[TransactionId]) {
150 let mut txs = self.transactions.write();
151 let mut order = self.order.write();
152
153 for id in ids {
154 txs.remove(id);
155 }
156
157 order.retain(|tx_id| !ids.contains(tx_id));
158
159 tracing::debug!(count = ids.len(), "removed batch from mempool");
160 }
161
162 pub fn get_for_proposal(&self) -> Vec<Transaction> {
166 let now = Instant::now();
167 let mut txs = self.transactions.write();
168 let order = self.order.read();
169
170 let mut result = Vec::with_capacity(self.config.max_transactions_per_block);
171
172 for id in order.iter() {
173 if result.len() >= self.config.max_transactions_per_block {
174 break;
175 }
176
177 if let Some(pending) = txs.get_mut(id) {
178 if now.duration_since(pending.added_at) > self.config.max_transaction_age {
180 continue;
181 }
182
183 pending.propose_count += 1;
184 result.push(pending.transaction.clone());
185 }
186 }
187
188 result
189 }
190
191 pub fn reap_expired(&self) -> usize {
193 let now = Instant::now();
194 let mut txs = self.transactions.write();
195 let mut order = self.order.write();
196
197 let initial_len = txs.len();
198 let expired_ids: Vec<_> = txs
199 .iter()
200 .filter(|(_, pending)| {
201 now.duration_since(pending.added_at) > self.config.max_transaction_age
202 })
203 .map(|(id, _)| *id)
204 .collect();
205
206 for id in &expired_ids {
207 txs.remove(id);
208 }
209
210 order.retain(|id| !expired_ids.contains(id));
211
212 let removed = initial_len - txs.len();
213 if removed > 0 {
214 tracing::debug!(removed, "reaped expired transactions");
215 }
216
217 removed
218 }
219
220 pub fn stats(&self) -> MempoolStats {
222 let txs = self.transactions.read();
223 let now = Instant::now();
224
225 let mut oldest_age = Duration::ZERO;
226 let mut total_propose_count = 0u64;
227
228 for pending in txs.values() {
229 let age = now.duration_since(pending.added_at);
230 if age > oldest_age {
231 oldest_age = age;
232 }
233 total_propose_count += pending.propose_count as u64;
234 }
235
236 MempoolStats {
237 transaction_count: txs.len(),
238 oldest_transaction_age: oldest_age,
239 average_propose_count: if txs.is_empty() {
240 0.0
241 } else {
242 total_propose_count as f64 / txs.len() as f64
243 },
244 }
245 }
246}
247
248#[derive(Debug, Clone)]
250pub struct MempoolStats {
251 pub transaction_count: usize,
253
254 pub oldest_transaction_age: Duration,
256
257 pub average_propose_count: f64,
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::transaction::{SerializablePublicKey, SerializableSignature};
265 use commonware_cryptography::{ed25519, PrivateKeyExt, Signer};
266
267 fn test_tx(seed: u64) -> Transaction {
268 let key = ed25519::PrivateKey::from_seed(seed);
269 let sig = key.sign(Some(b"_GUTS"), b"test");
270
271 Transaction::CreateRepository {
272 owner: "alice".into(),
273 name: format!("repo-{}", seed),
274 description: "A test".into(),
275 default_branch: "main".into(),
276 visibility: "public".into(),
277 creator: SerializablePublicKey::from_pubkey(&key.public_key()),
278 signature: SerializableSignature::from_signature(&sig),
279 }
280 }
281
282 #[test]
283 fn test_mempool_add_and_get() {
284 let mempool = Mempool::with_defaults();
285 let tx = test_tx(1);
286 let id = tx.id();
287
288 let result = mempool.add(tx.clone());
289 assert!(result.is_ok());
290 assert_eq!(result.unwrap(), id);
291
292 let retrieved = mempool.get(&id);
293 assert!(retrieved.is_some());
294 }
295
296 #[test]
297 fn test_mempool_duplicate() {
298 let mempool = Mempool::with_defaults();
299 let tx = test_tx(1);
300
301 assert!(mempool.add(tx.clone()).is_ok());
302 assert!(matches!(
303 mempool.add(tx),
304 Err(ConsensusError::DuplicateTransaction(_))
305 ));
306 }
307
308 #[test]
309 fn test_mempool_remove() {
310 let mempool = Mempool::with_defaults();
311 let tx = test_tx(1);
312 let id = mempool.add(tx).unwrap();
313
314 assert!(mempool.contains(&id));
315 assert!(mempool.remove(&id).is_some());
316 assert!(!mempool.contains(&id));
317 }
318
319 #[test]
320 fn test_mempool_capacity() {
321 let config = MempoolConfig {
322 max_transactions: 3,
323 ..Default::default()
324 };
325 let mempool = Mempool::new(config);
326
327 for i in 1..=5 {
328 mempool.add(test_tx(i)).unwrap();
329 }
330
331 assert_eq!(mempool.len(), 3);
333 }
334
335 #[test]
336 fn test_mempool_get_for_proposal() {
337 let config = MempoolConfig {
338 max_transactions_per_block: 2,
339 ..Default::default()
340 };
341 let mempool = Mempool::new(config);
342
343 for i in 1..=5 {
344 mempool.add(test_tx(i)).unwrap();
345 }
346
347 let proposal = mempool.get_for_proposal();
348 assert_eq!(proposal.len(), 2);
349 }
350
351 #[test]
352 fn test_mempool_stats() {
353 let mempool = Mempool::with_defaults();
354
355 for i in 1..=3 {
356 mempool.add(test_tx(i)).unwrap();
357 }
358
359 let stats = mempool.stats();
360 assert_eq!(stats.transaction_count, 3);
361 }
362
363 #[test]
364 fn test_mempool_remove_batch() {
365 let mempool = Mempool::with_defaults();
366 let mut ids = Vec::new();
367
368 for i in 1..=5 {
369 let id = mempool.add(test_tx(i)).unwrap();
370 ids.push(id);
371 }
372
373 mempool.remove_batch(&ids[0..3]);
374 assert_eq!(mempool.len(), 2);
375 }
376}