amaru_ouroboros_traits/mempool.rs
1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16 fmt,
17 fmt::{Display, Formatter},
18 pin::Pin,
19 sync::Arc,
20};
21
22use amaru_kernel::{Hash, Hasher, Peer, TransactionId, cbor, size::TRANSACTION_BODY};
23use serde::{Deserialize, Serialize};
24
25use crate::CanValidateTransactions;
26
27/// An simple mempool interface to:
28///
29/// - Retrieve transactions to be included in a new block.
30/// - Acknowledge the transactions included in a block, so they can be removed from the pool.
31/// - Support the transaction submission protocol to share transactions with peers.
32///
33pub trait Mempool<Tx: Send + Sync + 'static>: TxSubmissionMempool<Tx> + Send + Sync {
34 /// Take transactions out of the mempool, with the intent of forging a new block.
35 ///
36 /// TODO: Have this function take _constraints_, such as the block max size or max execution
37 /// units and select transactions accordingly.
38 fn take(&self) -> Vec<Tx>;
39
40 /// Take note of a transaction that has been included in a block.
41 /// The keys function is used to detect all the transactions that should be removed from the mempool.
42 /// (if a transaction in the mempool shares any of the transactions keys, it should be removed).
43 fn acknowledge<TxKey: Ord, I>(&self, tx: &Tx, keys: fn(&Tx) -> I)
44 where
45 I: IntoIterator<Item = TxKey>,
46 Self: Sized;
47}
48
49pub type ResourceMempool<Tx> = Arc<dyn TxSubmissionMempool<Tx>>;
50
51pub trait TxSubmissionMempool<Tx: Send + Sync + 'static>: Send + Sync + CanValidateTransactions<Tx> {
52 /// Insert a transaction into the mempool, specifying its origin.
53 /// A TxOrigin::Local origin indicates the transaction was created on the current node,
54 /// A TxOrigin::Remote(origin_peer) indicates the transaction was received from a remote peer.
55 fn insert(&self, tx: Tx, tx_origin: TxOrigin) -> Result<(TxId, MempoolSeqNo), TxRejectReason>;
56
57 /// Add a new, local, transaction to the mempool.
58 ///
59 /// TODO: Have the mempool perform its own set of validations and possibly fail to add new
60 /// elements. This is non-trivial, since it requires the mempool to have ways of re-validating
61 /// transactions provided a slightly different context.
62 ///
63 /// We shall circle back to this once we've done some progress on the ledger validations and
64 /// the so-called ledger slices.
65 ///
66 /// Return the assigned `MempoolSeqNo` if accepted.
67 fn add(&self, tx: Tx) -> Result<(), TxRejectReason> {
68 let _ = self.insert(tx, TxOrigin::Local)?;
69 Ok(())
70 }
71
72 /// Retrieve a transaction by its id.
73 fn get_tx(&self, tx_id: &TxId) -> Option<Tx>;
74
75 /// Return true if the mempool contains a transaction with the given id.
76 fn contains(&self, tx_id: &TxId) -> bool {
77 self.get_tx(tx_id).is_some()
78 }
79
80 /// Retrieve a list of transaction ids from a given sequence number (inclusive), up to a given limit.
81 fn tx_ids_since(&self, from_seq: MempoolSeqNo, limit: u16) -> Vec<(TxId, u32, MempoolSeqNo)>;
82
83 /// Wait until the mempool reaches at least the given sequence number.
84 /// Then a tx submission initiator knows that there are enough new transactions to send to its peer.
85 ///
86 /// When the mempool is already at or above the required number,
87 /// this future will resolve to `true` immediately.
88 ///
89 /// Otherwise, if for some reason the mempool cannot reach the required number, it should return
90 /// false.
91 fn wait_for_at_least(&self, seq_no: MempoolSeqNo) -> Pin<Box<dyn Future<Output = bool> + Send + '_>>;
92
93 /// Retrieve a list of transactions for the given ids.
94 fn get_txs_for_ids(&self, ids: &[TxId]) -> Vec<Tx>;
95
96 /// Get the last assigned sequence number in the mempool.
97 fn last_seq_no(&self) -> MempoolSeqNo;
98}
99
100/// Sequence number assigned to a transaction when inserted into the mempool.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default)]
102pub struct MempoolSeqNo(pub u64);
103
104impl MempoolSeqNo {
105 pub fn next(&self) -> MempoolSeqNo {
106 MempoolSeqNo(self.0 + 1)
107 }
108
109 pub fn add(&self, n: u64) -> MempoolSeqNo {
110 MempoolSeqNo(self.0 + n)
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, thiserror::Error, Serialize, Deserialize)]
115pub enum TxRejectReason {
116 #[error("Mempool is full")]
117 MempoolFull,
118 #[error("Transaction is a duplicate")]
119 Duplicate,
120 #[error("Transaction is invalid")]
121 Invalid,
122}
123
124/// Origin of a transaction being inserted into the mempool:
125/// - Local: created locally
126/// - Remote(Peer): received from a remote peer
127#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
128pub enum TxOrigin {
129 Local,
130 Remote(Peer),
131}
132
133/// Identifier for a transaction in the mempool.
134/// It is derived from the hash of the encoding of the transaction as CBOR.
135#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
136pub struct TxId(TransactionId);
137
138impl cbor::Encode<()> for TxId {
139 fn encode<W: cbor::encode::Write>(
140 &self,
141 e: &mut cbor::Encoder<W>,
142 _ctx: &mut (),
143 ) -> Result<(), cbor::encode::Error<W::Error>> {
144 e.encode(self.0)?;
145 Ok(())
146 }
147}
148
149impl<'b> cbor::Decode<'b, ()> for TxId {
150 fn decode(d: &mut cbor::Decoder<'b>, _ctx: &mut ()) -> Result<Self, cbor::decode::Error> {
151 let hash = Hash::<TRANSACTION_BODY>::decode(d, _ctx)?;
152 Ok(TxId(hash))
153 }
154}
155
156impl TxId {
157 pub fn to_vec(&self) -> Vec<u8> {
158 self.0.as_ref().to_vec()
159 }
160
161 pub fn as_slice(&self) -> &[u8] {
162 self.0.as_slice()
163 }
164}
165
166impl TxId {
167 pub fn new(hash: Hash<TRANSACTION_BODY>) -> Self {
168 TxId(hash)
169 }
170
171 pub fn from<Tx: cbor::Encode<()>>(tx: &Tx) -> Self {
172 TxId(Hasher::<{ TRANSACTION_BODY * 8 }>::hash_cbor(tx))
173 }
174}
175
176impl Display for TxId {
177 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
178 write!(f, "{}", hex::encode(self.as_slice()))
179 }
180}