guts_consensus/
mempool.rs

1//! Transaction mempool for pending transactions.
2//!
3//! The mempool holds transactions that have been submitted but not yet
4//! included in a finalized block.
5
6use crate::error::{ConsensusError, Result};
7use crate::transaction::{Transaction, TransactionId};
8use parking_lot::RwLock;
9use std::collections::{HashMap, VecDeque};
10use std::time::{Duration, Instant};
11
12/// Configuration for the mempool.
13#[derive(Debug, Clone)]
14pub struct MempoolConfig {
15    /// Maximum number of transactions in the mempool.
16    pub max_transactions: usize,
17
18    /// Maximum transaction age before eviction.
19    pub max_transaction_age: Duration,
20
21    /// Maximum transactions per block proposal.
22    pub max_transactions_per_block: usize,
23}
24
25impl Default for MempoolConfig {
26    fn default() -> Self {
27        Self {
28            max_transactions: 10_000,
29            max_transaction_age: Duration::from_secs(600), // 10 minutes
30            max_transactions_per_block: 1000,
31        }
32    }
33}
34
35/// Metadata about a pending transaction.
36#[derive(Debug, Clone)]
37struct PendingTransaction {
38    /// The transaction.
39    transaction: Transaction,
40
41    /// When the transaction was added.
42    added_at: Instant,
43
44    /// Number of times this transaction has been proposed but not finalized.
45    propose_count: u32,
46}
47
48/// The transaction mempool.
49pub struct Mempool {
50    /// Configuration.
51    config: MempoolConfig,
52
53    /// Pending transactions indexed by ID.
54    transactions: RwLock<HashMap<TransactionId, PendingTransaction>>,
55
56    /// Order of transaction arrival (for FIFO proposal).
57    order: RwLock<VecDeque<TransactionId>>,
58}
59
60impl Mempool {
61    /// Creates a new mempool with the given configuration.
62    pub fn new(config: MempoolConfig) -> Self {
63        Self {
64            config,
65            transactions: RwLock::new(HashMap::new()),
66            order: RwLock::new(VecDeque::new()),
67        }
68    }
69
70    /// Creates a new mempool with default configuration.
71    pub fn with_defaults() -> Self {
72        Self::new(MempoolConfig::default())
73    }
74
75    /// Adds a transaction to the mempool.
76    pub fn add(&self, transaction: Transaction) -> Result<TransactionId> {
77        let id = transaction.id();
78
79        let mut txs = self.transactions.write();
80        let mut order = self.order.write();
81
82        // Check for duplicates
83        if txs.contains_key(&id) {
84            return Err(ConsensusError::DuplicateTransaction(id.to_hex()));
85        }
86
87        // Evict old transactions if at capacity
88        while txs.len() >= self.config.max_transactions {
89            if let Some(old_id) = order.pop_front() {
90                txs.remove(&old_id);
91                tracing::debug!(?old_id, "evicted transaction due to mempool capacity");
92            } else {
93                break;
94            }
95        }
96
97        // Add the transaction
98        let pending = PendingTransaction {
99            transaction,
100            added_at: Instant::now(),
101            propose_count: 0,
102        };
103
104        txs.insert(id, pending);
105        order.push_back(id);
106
107        tracing::trace!(?id, "added transaction to mempool");
108
109        Ok(id)
110    }
111
112    /// Gets a transaction by ID.
113    pub fn get(&self, id: &TransactionId) -> Option<Transaction> {
114        self.transactions
115            .read()
116            .get(id)
117            .map(|p| p.transaction.clone())
118    }
119
120    /// Checks if a transaction exists in the mempool.
121    pub fn contains(&self, id: &TransactionId) -> bool {
122        self.transactions.read().contains_key(id)
123    }
124
125    /// Returns the number of pending transactions.
126    pub fn len(&self) -> usize {
127        self.transactions.read().len()
128    }
129
130    /// Returns true if the mempool is empty.
131    pub fn is_empty(&self) -> bool {
132        self.transactions.read().is_empty()
133    }
134
135    /// Removes a transaction from the mempool.
136    pub fn remove(&self, id: &TransactionId) -> Option<Transaction> {
137        let mut txs = self.transactions.write();
138        let mut order = self.order.write();
139
140        if let Some(pending) = txs.remove(id) {
141            order.retain(|tx_id| tx_id != id);
142            Some(pending.transaction)
143        } else {
144            None
145        }
146    }
147
148    /// Removes multiple transactions from the mempool.
149    pub fn remove_batch(&self, ids: &[TransactionId]) {
150        let mut txs = self.transactions.write();
151        let mut order = self.order.write();
152
153        for id in ids {
154            txs.remove(id);
155        }
156
157        order.retain(|tx_id| !ids.contains(tx_id));
158
159        tracing::debug!(count = ids.len(), "removed batch from mempool");
160    }
161
162    /// Gets transactions for a block proposal.
163    ///
164    /// Returns up to `max_transactions_per_block` transactions in FIFO order.
165    pub fn get_for_proposal(&self) -> Vec<Transaction> {
166        let now = Instant::now();
167        let mut txs = self.transactions.write();
168        let order = self.order.read();
169
170        let mut result = Vec::with_capacity(self.config.max_transactions_per_block);
171
172        for id in order.iter() {
173            if result.len() >= self.config.max_transactions_per_block {
174                break;
175            }
176
177            if let Some(pending) = txs.get_mut(id) {
178                // Skip if too old
179                if now.duration_since(pending.added_at) > self.config.max_transaction_age {
180                    continue;
181                }
182
183                pending.propose_count += 1;
184                result.push(pending.transaction.clone());
185            }
186        }
187
188        result
189    }
190
191    /// Reaps expired transactions from the mempool.
192    pub fn reap_expired(&self) -> usize {
193        let now = Instant::now();
194        let mut txs = self.transactions.write();
195        let mut order = self.order.write();
196
197        let initial_len = txs.len();
198        let expired_ids: Vec<_> = txs
199            .iter()
200            .filter(|(_, pending)| {
201                now.duration_since(pending.added_at) > self.config.max_transaction_age
202            })
203            .map(|(id, _)| *id)
204            .collect();
205
206        for id in &expired_ids {
207            txs.remove(id);
208        }
209
210        order.retain(|id| !expired_ids.contains(id));
211
212        let removed = initial_len - txs.len();
213        if removed > 0 {
214            tracing::debug!(removed, "reaped expired transactions");
215        }
216
217        removed
218    }
219
220    /// Returns statistics about the mempool.
221    pub fn stats(&self) -> MempoolStats {
222        let txs = self.transactions.read();
223        let now = Instant::now();
224
225        let mut oldest_age = Duration::ZERO;
226        let mut total_propose_count = 0u64;
227
228        for pending in txs.values() {
229            let age = now.duration_since(pending.added_at);
230            if age > oldest_age {
231                oldest_age = age;
232            }
233            total_propose_count += pending.propose_count as u64;
234        }
235
236        MempoolStats {
237            transaction_count: txs.len(),
238            oldest_transaction_age: oldest_age,
239            average_propose_count: if txs.is_empty() {
240                0.0
241            } else {
242                total_propose_count as f64 / txs.len() as f64
243            },
244        }
245    }
246}
247
248/// Statistics about the mempool.
249#[derive(Debug, Clone)]
250pub struct MempoolStats {
251    /// Number of pending transactions.
252    pub transaction_count: usize,
253
254    /// Age of the oldest transaction.
255    pub oldest_transaction_age: Duration,
256
257    /// Average number of times transactions have been proposed.
258    pub average_propose_count: f64,
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use crate::transaction::{SerializablePublicKey, SerializableSignature};
265    use commonware_cryptography::{ed25519, PrivateKeyExt, Signer};
266
267    fn test_tx(seed: u64) -> Transaction {
268        let key = ed25519::PrivateKey::from_seed(seed);
269        let sig = key.sign(Some(b"_GUTS"), b"test");
270
271        Transaction::CreateRepository {
272            owner: "alice".into(),
273            name: format!("repo-{}", seed),
274            description: "A test".into(),
275            default_branch: "main".into(),
276            visibility: "public".into(),
277            creator: SerializablePublicKey::from_pubkey(&key.public_key()),
278            signature: SerializableSignature::from_signature(&sig),
279        }
280    }
281
282    #[test]
283    fn test_mempool_add_and_get() {
284        let mempool = Mempool::with_defaults();
285        let tx = test_tx(1);
286        let id = tx.id();
287
288        let result = mempool.add(tx.clone());
289        assert!(result.is_ok());
290        assert_eq!(result.unwrap(), id);
291
292        let retrieved = mempool.get(&id);
293        assert!(retrieved.is_some());
294    }
295
296    #[test]
297    fn test_mempool_duplicate() {
298        let mempool = Mempool::with_defaults();
299        let tx = test_tx(1);
300
301        assert!(mempool.add(tx.clone()).is_ok());
302        assert!(matches!(
303            mempool.add(tx),
304            Err(ConsensusError::DuplicateTransaction(_))
305        ));
306    }
307
308    #[test]
309    fn test_mempool_remove() {
310        let mempool = Mempool::with_defaults();
311        let tx = test_tx(1);
312        let id = mempool.add(tx).unwrap();
313
314        assert!(mempool.contains(&id));
315        assert!(mempool.remove(&id).is_some());
316        assert!(!mempool.contains(&id));
317    }
318
319    #[test]
320    fn test_mempool_capacity() {
321        let config = MempoolConfig {
322            max_transactions: 3,
323            ..Default::default()
324        };
325        let mempool = Mempool::new(config);
326
327        for i in 1..=5 {
328            mempool.add(test_tx(i)).unwrap();
329        }
330
331        // Should have evicted first 2
332        assert_eq!(mempool.len(), 3);
333    }
334
335    #[test]
336    fn test_mempool_get_for_proposal() {
337        let config = MempoolConfig {
338            max_transactions_per_block: 2,
339            ..Default::default()
340        };
341        let mempool = Mempool::new(config);
342
343        for i in 1..=5 {
344            mempool.add(test_tx(i)).unwrap();
345        }
346
347        let proposal = mempool.get_for_proposal();
348        assert_eq!(proposal.len(), 2);
349    }
350
351    #[test]
352    fn test_mempool_stats() {
353        let mempool = Mempool::with_defaults();
354
355        for i in 1..=3 {
356            mempool.add(test_tx(i)).unwrap();
357        }
358
359        let stats = mempool.stats();
360        assert_eq!(stats.transaction_count, 3);
361    }
362
363    #[test]
364    fn test_mempool_remove_batch() {
365        let mempool = Mempool::with_defaults();
366        let mut ids = Vec::new();
367
368        for i in 1..=5 {
369            let id = mempool.add(test_tx(i)).unwrap();
370            ids.push(id);
371        }
372
373        mempool.remove_batch(&ids[0..3]);
374        assert_eq!(mempool.len(), 2);
375    }
376}