Skip to main content

amaru_ouroboros_traits/
mempool.rs

1// Copyright 2025 PRAGMA
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    fmt,
17    fmt::{Display, Formatter},
18    pin::Pin,
19    sync::Arc,
20};
21
22use amaru_kernel::{Hash, Hasher, Peer, TransactionId, cbor, size::TRANSACTION_BODY};
23use serde::{Deserialize, Serialize};
24
25use crate::CanValidateTransactions;
26
27/// An simple mempool interface to:
28///
29/// - Retrieve transactions to be included in a new block.
30/// - Acknowledge the transactions included in a block, so they can be removed from the pool.
31/// - Support the transaction submission protocol to share transactions with peers.
32///
33pub trait Mempool<Tx: Send + Sync + 'static>: TxSubmissionMempool<Tx> + Send + Sync {
34    /// Take transactions out of the mempool, with the intent of forging a new block.
35    ///
36    /// TODO: Have this function take _constraints_, such as the block max size or max execution
37    /// units and select transactions accordingly.
38    fn take(&self) -> Vec<Tx>;
39
40    /// Take note of a transaction that has been included in a block.
41    /// The keys function is used to detect all the transactions that should be removed from the mempool.
42    /// (if a transaction in the mempool shares any of the transactions keys, it should be removed).
43    fn acknowledge<TxKey: Ord, I>(&self, tx: &Tx, keys: fn(&Tx) -> I)
44    where
45        I: IntoIterator<Item = TxKey>,
46        Self: Sized;
47}
48
49pub type ResourceMempool<Tx> = Arc<dyn TxSubmissionMempool<Tx>>;
50
51pub trait TxSubmissionMempool<Tx: Send + Sync + 'static>: Send + Sync + CanValidateTransactions<Tx> {
52    /// Insert a transaction into the mempool, specifying its origin.
53    /// A TxOrigin::Local origin indicates the transaction was created on the current node,
54    /// A TxOrigin::Remote(origin_peer) indicates the transaction was received from a remote peer.
55    fn insert(&self, tx: Tx, tx_origin: TxOrigin) -> Result<(TxId, MempoolSeqNo), TxRejectReason>;
56
57    /// Add a new, local, transaction to the mempool.
58    ///
59    /// TODO: Have the mempool perform its own set of validations and possibly fail to add new
60    /// elements. This is non-trivial, since it requires the mempool to have ways of re-validating
61    /// transactions provided a slightly different context.
62    ///
63    /// We shall circle back to this once we've done some progress on the ledger validations and
64    /// the so-called ledger slices.
65    ///
66    /// Return the assigned `MempoolSeqNo` if accepted.
67    fn add(&self, tx: Tx) -> Result<(), TxRejectReason> {
68        let _ = self.insert(tx, TxOrigin::Local)?;
69        Ok(())
70    }
71
72    /// Retrieve a transaction by its id.
73    fn get_tx(&self, tx_id: &TxId) -> Option<Tx>;
74
75    /// Return true if the mempool contains a transaction with the given id.
76    fn contains(&self, tx_id: &TxId) -> bool {
77        self.get_tx(tx_id).is_some()
78    }
79
80    /// Retrieve a list of transaction ids from a given sequence number (inclusive), up to a given limit.
81    fn tx_ids_since(&self, from_seq: MempoolSeqNo, limit: u16) -> Vec<(TxId, u32, MempoolSeqNo)>;
82
83    /// Wait until the mempool reaches at least the given sequence number.
84    /// Then a tx submission initiator knows that there are enough new transactions to send to its peer.
85    ///
86    /// When the mempool is already at or above the required number,
87    /// this future will resolve to `true` immediately.
88    ///
89    /// Otherwise, if for some reason the mempool cannot reach the required number, it should return
90    /// false.
91    fn wait_for_at_least(&self, seq_no: MempoolSeqNo) -> Pin<Box<dyn Future<Output = bool> + Send + '_>>;
92
93    /// Retrieve a list of transactions for the given ids.
94    fn get_txs_for_ids(&self, ids: &[TxId]) -> Vec<Tx>;
95
96    /// Get the last assigned sequence number in the mempool.
97    fn last_seq_no(&self) -> MempoolSeqNo;
98}
99
100/// Sequence number assigned to a transaction when inserted into the mempool.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Default)]
102pub struct MempoolSeqNo(pub u64);
103
104impl MempoolSeqNo {
105    pub fn next(&self) -> MempoolSeqNo {
106        MempoolSeqNo(self.0 + 1)
107    }
108
109    pub fn add(&self, n: u64) -> MempoolSeqNo {
110        MempoolSeqNo(self.0 + n)
111    }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, thiserror::Error, Serialize, Deserialize)]
115pub enum TxRejectReason {
116    #[error("Mempool is full")]
117    MempoolFull,
118    #[error("Transaction is a duplicate")]
119    Duplicate,
120    #[error("Transaction is invalid")]
121    Invalid,
122}
123
124/// Origin of a transaction being inserted into the mempool:
125/// - Local: created locally
126/// - Remote(Peer): received from a remote peer
127#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
128pub enum TxOrigin {
129    Local,
130    Remote(Peer),
131}
132
133/// Identifier for a transaction in the mempool.
134/// It is derived from the hash of the encoding of the transaction as CBOR.
135#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
136pub struct TxId(TransactionId);
137
138impl cbor::Encode<()> for TxId {
139    fn encode<W: cbor::encode::Write>(
140        &self,
141        e: &mut cbor::Encoder<W>,
142        _ctx: &mut (),
143    ) -> Result<(), cbor::encode::Error<W::Error>> {
144        e.encode(self.0)?;
145        Ok(())
146    }
147}
148
149impl<'b> cbor::Decode<'b, ()> for TxId {
150    fn decode(d: &mut cbor::Decoder<'b>, _ctx: &mut ()) -> Result<Self, cbor::decode::Error> {
151        let hash = Hash::<TRANSACTION_BODY>::decode(d, _ctx)?;
152        Ok(TxId(hash))
153    }
154}
155
156impl TxId {
157    pub fn to_vec(&self) -> Vec<u8> {
158        self.0.as_ref().to_vec()
159    }
160
161    pub fn as_slice(&self) -> &[u8] {
162        self.0.as_slice()
163    }
164}
165
166impl TxId {
167    pub fn new(hash: Hash<TRANSACTION_BODY>) -> Self {
168        TxId(hash)
169    }
170
171    pub fn from<Tx: cbor::Encode<()>>(tx: &Tx) -> Self {
172        TxId(Hasher::<{ TRANSACTION_BODY * 8 }>::hash_cbor(tx))
173    }
174}
175
176impl Display for TxId {
177    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
178        write!(f, "{}", hex::encode(self.as_slice()))
179    }
180}