Skip to main content

ethrex_p2p/
tx_broadcaster.rs

1use std::{
2    collections::HashMap,
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use ethrex_blockchain::Blockchain;
8use ethrex_common::H256;
9use ethrex_common::types::{MempoolTransaction, Transaction};
10use ethrex_storage::error::StoreError;
11use rand::{seq::SliceRandom, thread_rng};
12use spawned_concurrency::{
13    actor,
14    error::ActorError,
15    protocol,
16    tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, send_interval},
17};
18use tracing::{debug, error, trace};
19
20use crate::{
21    peer_table::{PeerTable, PeerTableServerProtocol as _},
22    rlpx::{
23        Message,
24        connection::server::PeerConnection,
25        eth::transactions::{NewPooledTransactionHashes, Transactions},
26        p2p::{Capability, SUPPORTED_ETH_CAPABILITIES},
27    },
28};
29
30// Soft limit for the number of transaction hashes sent in a single NewPooledTransactionHashes message as per [the spec](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#newpooledtransactionhashes-0x080)
31const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;
32
33// Amount of seconds after which we prune broadcast records (We should fine tune this)
34const PRUNE_WAIT_TIME_SECS: u64 = 600; // 10 minutes
35
36// Amount of seconds between each prune
37const PRUNE_INTERVAL_SECS: u64 = 360; // 6 minutes
38
39// Amount of milliseconds between each broadcast
40pub const BROADCAST_INTERVAL_MS: u64 = 1000; // 1 second
41
42#[protocol]
43pub trait TxBroadcasterProtocol: Send + Sync {
44    fn broadcast_txs(&self) -> Result<(), ActorError>;
45    fn add_txs(&self, tx_hashes: Vec<H256>, peer_id: H256) -> Result<(), ActorError>;
46    fn prune_txs(&self) -> Result<(), ActorError>;
47}
48
49#[derive(Debug, Clone, Default)]
50struct PeerMask {
51    bits: Vec<u64>,
52}
53
54impl PeerMask {
55    #[inline]
56    // Ensure that the internal bit vector can hold the given index
57    // If not, resize the vector.
58    fn ensure(&mut self, idx: u32) {
59        let word = (idx as usize) / 64;
60        if self.bits.len() <= word {
61            self.bits.resize(word + 1, 0);
62        }
63    }
64
65    #[inline]
66    fn is_set(&self, idx: u32) -> bool {
67        let word = (idx as usize) / 64;
68        if word >= self.bits.len() {
69            return false;
70        }
71        let bit = (idx as usize) % 64;
72        (self.bits[word] >> bit) & 1 == 1
73    }
74
75    #[inline]
76    fn set(&mut self, idx: u32) {
77        self.ensure(idx);
78        let word = (idx as usize) / 64;
79        let bit = (idx as usize) % 64;
80        self.bits[word] |= 1u64 << bit;
81    }
82}
83
84#[derive(Debug, Clone)]
85struct BroadcastRecord {
86    peers: PeerMask,
87    last_sent: Instant,
88}
89
90impl Default for BroadcastRecord {
91    fn default() -> Self {
92        Self {
93            peers: PeerMask::default(),
94            last_sent: Instant::now(),
95        }
96    }
97}
98
99#[derive(Debug, Clone)]
100pub struct TxBroadcaster {
101    peer_table: PeerTable,
102    blockchain: Arc<Blockchain>,
103    // tx_hash -> broadcast record (which peers know it and when it was last sent)
104    known_txs: HashMap<H256, BroadcastRecord>,
105    // Assign each peer_id (H256) a u32 index used by PeerMask entries
106    peer_indexer: HashMap<H256, u32>,
107    // Next index to assign to a new peer
108    next_peer_idx: u32,
109    tx_broadcasting_time_interval: u64,
110}
111
112pub async fn send_tx_hashes(
113    txs: Vec<MempoolTransaction>,
114    capabilities: Vec<Capability>,
115    connection: &mut PeerConnection,
116    peer_id: H256,
117    blockchain: &Arc<Blockchain>,
118) -> Result<(), TxBroadcasterError> {
119    if SUPPORTED_ETH_CAPABILITIES
120        .iter()
121        .any(|cap| capabilities.contains(cap))
122    {
123        for tx_chunk in txs.chunks(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT) {
124            let tx_count = tx_chunk.len();
125            let mut txs_to_send = Vec::with_capacity(tx_count);
126            for tx in tx_chunk {
127                txs_to_send.push((**tx).clone());
128            }
129            let hashes_message = Message::NewPooledTransactionHashes(
130                NewPooledTransactionHashes::new(txs_to_send, blockchain)?,
131            );
132            connection.outgoing_message(hashes_message.clone()).await.unwrap_or_else(|err| {
133                debug!(peer_id = %format!("{:#x}", peer_id), err = ?err, "Failed to send transaction hashes");
134            });
135        }
136    }
137    Ok(())
138}
139
140#[actor(protocol = TxBroadcasterProtocol)]
141impl TxBroadcaster {
142    pub fn spawn(
143        kademlia: PeerTable,
144        blockchain: Arc<Blockchain>,
145        tx_broadcasting_time_interval: u64,
146    ) -> Result<ActorRef<TxBroadcaster>, TxBroadcasterError> {
147        debug!("Starting transaction broadcaster");
148
149        let state = TxBroadcaster {
150            peer_table: kademlia,
151            blockchain,
152            known_txs: HashMap::new(),
153            peer_indexer: HashMap::new(),
154            next_peer_idx: 0,
155            tx_broadcasting_time_interval,
156        };
157
158        Ok(state.start())
159    }
160
161    #[started]
162    async fn started(&mut self, ctx: &Context<Self>) {
163        send_interval(
164            Duration::from_millis(self.tx_broadcasting_time_interval),
165            ctx.clone(),
166            tx_broadcaster_protocol::BroadcastTxs,
167        );
168
169        send_interval(
170            Duration::from_secs(PRUNE_INTERVAL_SECS),
171            ctx.clone(),
172            tx_broadcaster_protocol::PruneTxs,
173        );
174    }
175
176    #[send_handler]
177    async fn handle_broadcast_txs(
178        &mut self,
179        _msg: tx_broadcaster_protocol::BroadcastTxs,
180        _ctx: &Context<Self>,
181    ) {
182        trace!(received = "BroadcastTxs");
183
184        let _ = self.do_broadcast_txs().await.inspect_err(|err| {
185            error!(err = ?err, "Failed to broadcast transactions");
186        });
187    }
188
189    #[send_handler]
190    async fn handle_add_txs(&mut self, msg: tx_broadcaster_protocol::AddTxs, _ctx: &Context<Self>) {
191        debug!(received = "AddTxs", tx_count = msg.tx_hashes.len());
192        self.do_add_txs(msg.tx_hashes, msg.peer_id);
193    }
194
195    #[send_handler]
196    async fn handle_prune_txs(
197        &mut self,
198        _msg: tx_broadcaster_protocol::PruneTxs,
199        _ctx: &Context<Self>,
200    ) {
201        debug!(received = "PruneTxs");
202        let now = Instant::now();
203        let before = self.known_txs.len();
204        let prune_window = Duration::from_secs(PRUNE_WAIT_TIME_SECS);
205
206        self.known_txs
207            .retain(|_, record| now.duration_since(record.last_sent) < prune_window);
208        debug!(
209            before = before,
210            after = self.known_txs.len(),
211            "Pruned old broadcasted transactions"
212        );
213
214        // Piggyback the alternates-map sweep on this same tick.
215        let _ = self.blockchain.mempool.prune_alternates(prune_window);
216    }
217
218    // Get or assign a unique index to the peer_id
219    #[inline]
220    fn peer_index(&mut self, peer_id: H256) -> u32 {
221        if let Some(&idx) = self.peer_indexer.get(&peer_id) {
222            idx
223        } else {
224            // We are assigning indexes sequentially, so next_peer_idx is always the next available one.
225            // self.peer_indexer.len() could be used instead of next_peer_idx but avoided here if we ever
226            // remove entries from peer_indexer in the future.
227            let idx = self.next_peer_idx;
228            // In practice we won't exceed u32::MAX (~4.29 Billion) peers.
229            self.next_peer_idx += 1;
230            self.peer_indexer.insert(peer_id, idx);
231            idx
232        }
233    }
234
235    fn do_add_txs(&mut self, txs: Vec<H256>, peer_id: H256) {
236        debug!(total = self.known_txs.len(), adding = txs.len(), peer_id = %format!("{:#x}", peer_id), "Adding transactions to known list");
237
238        if txs.is_empty() {
239            return;
240        }
241
242        let now = Instant::now();
243        let peer_idx = self.peer_index(peer_id);
244        for tx in txs {
245            let record = self.known_txs.entry(tx).or_default();
246            record.peers.set(peer_idx);
247            record.last_sent = now;
248        }
249    }
250
251    async fn do_broadcast_txs(&mut self) -> Result<(), TxBroadcasterError> {
252        let txs_to_broadcast = self
253            .blockchain
254            .mempool
255            .get_txs_for_broadcast()
256            .map_err(|_| TxBroadcasterError::Broadcast)?;
257        if txs_to_broadcast.is_empty() {
258            return Ok(());
259        }
260        let peers = self.peer_table.get_peers_with_capabilities().await?;
261        let peer_sqrt = (peers.len() as f64).sqrt();
262
263        let full_txs = txs_to_broadcast
264            .iter()
265            .map(|tx| tx.transaction().clone())
266            .filter(|tx| {
267                !matches!(tx, Transaction::EIP4844Transaction { .. }) && !tx.is_privileged()
268            })
269            .collect::<Vec<Transaction>>();
270
271        let blob_txs = txs_to_broadcast
272            .iter()
273            .filter(|tx| matches!(tx.transaction(), Transaction::EIP4844Transaction { .. }))
274            .cloned()
275            .collect::<Vec<MempoolTransaction>>();
276
277        let mut shuffled_peers = peers.clone();
278        shuffled_peers.shuffle(&mut thread_rng());
279
280        let (peers_to_send_full_txs, peers_to_send_hashes) =
281            shuffled_peers.split_at(peer_sqrt.ceil() as usize);
282
283        for (peer_id, mut connection, capabilities) in peers_to_send_full_txs.iter().cloned() {
284            let peer_idx = self.peer_index(peer_id);
285            let txs_to_send = full_txs
286                .iter()
287                .filter(|tx| {
288                    let hash = tx.hash();
289                    !self
290                        .known_txs
291                        .get(&hash)
292                        .is_some_and(|record| record.peers.is_set(peer_idx))
293                })
294                .cloned()
295                .collect::<Vec<Transaction>>();
296            self.do_add_txs(txs_to_send.iter().map(|tx| tx.hash()).collect(), peer_id);
297            // If a peer is selected to receive the full transactions, we don't send the blob transactions, since they only require to send the hashes
298            let txs_message = Message::Transactions(Transactions {
299                transactions: txs_to_send,
300            });
301            connection.outgoing_message(txs_message).await.unwrap_or_else(|err| {
302                debug!(peer_id = %format!("{:#x}", peer_id), err = ?err, "Failed to send transactions");
303            });
304            self.send_tx_hashes_internal(blob_txs.clone(), capabilities, &mut connection, peer_id)
305                .await?;
306        }
307        for (peer_id, mut connection, capabilities) in peers_to_send_hashes.iter().cloned() {
308            // If a peer is not selected to receive the full transactions, we only send the hashes of all transactions (including blob transactions)
309            self.send_tx_hashes_internal(
310                txs_to_broadcast.clone(),
311                capabilities,
312                &mut connection,
313                peer_id,
314            )
315            .await?;
316        }
317        let broadcasted_hashes: Vec<H256> = txs_to_broadcast.iter().map(|tx| tx.hash()).collect();
318        self.blockchain
319            .mempool
320            .remove_broadcasted_txs(&broadcasted_hashes)?;
321        Ok(())
322    }
323
324    async fn send_tx_hashes_internal(
325        &mut self,
326        txs: Vec<MempoolTransaction>,
327        capabilities: Vec<Capability>,
328        connection: &mut PeerConnection,
329        peer_id: H256,
330    ) -> Result<(), TxBroadcasterError> {
331        let peer_idx = self.peer_index(peer_id);
332        let txs_to_send = txs
333            .iter()
334            .filter(|tx| {
335                let hash = tx.hash();
336                !self
337                    .known_txs
338                    .get(&hash)
339                    .is_some_and(|record| record.peers.is_set(peer_idx))
340                    && !tx.is_privileged()
341            })
342            .cloned()
343            .collect::<Vec<MempoolTransaction>>();
344        self.do_add_txs(txs_to_send.iter().map(|tx| tx.hash()).collect(), peer_id);
345        send_tx_hashes(
346            txs_to_send,
347            capabilities,
348            connection,
349            peer_id,
350            &self.blockchain,
351        )
352        .await
353    }
354}
355
356#[derive(Debug, thiserror::Error)]
357pub enum TxBroadcasterError {
358    #[error("Failed to broadcast transactions")]
359    Broadcast,
360    #[error(transparent)]
361    StoreError(#[from] StoreError),
362    #[error(transparent)]
363    PeerTableError(#[from] ActorError),
364}