1use std::{
2 collections::HashMap,
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use ethrex_blockchain::Blockchain;
8use ethrex_common::H256;
9use ethrex_common::types::{MempoolTransaction, Transaction};
10use ethrex_storage::error::StoreError;
11use rand::{seq::SliceRandom, thread_rng};
12use spawned_concurrency::{
13 actor,
14 error::ActorError,
15 protocol,
16 tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, send_interval},
17};
18use tracing::{debug, error, trace};
19
20use crate::{
21 peer_table::{PeerTable, PeerTableServerProtocol as _},
22 rlpx::{
23 Message,
24 connection::server::PeerConnection,
25 eth::transactions::{NewPooledTransactionHashes, Transactions},
26 p2p::{Capability, SUPPORTED_ETH_CAPABILITIES},
27 },
28};
29
30const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;
32
33const PRUNE_WAIT_TIME_SECS: u64 = 600; const PRUNE_INTERVAL_SECS: u64 = 360; pub const BROADCAST_INTERVAL_MS: u64 = 1000; #[protocol]
43pub trait TxBroadcasterProtocol: Send + Sync {
44 fn broadcast_txs(&self) -> Result<(), ActorError>;
45 fn add_txs(&self, tx_hashes: Vec<H256>, peer_id: H256) -> Result<(), ActorError>;
46 fn prune_txs(&self) -> Result<(), ActorError>;
47}
48
49#[derive(Debug, Clone, Default)]
50struct PeerMask {
51 bits: Vec<u64>,
52}
53
54impl PeerMask {
55 #[inline]
56 fn ensure(&mut self, idx: u32) {
59 let word = (idx as usize) / 64;
60 if self.bits.len() <= word {
61 self.bits.resize(word + 1, 0);
62 }
63 }
64
65 #[inline]
66 fn is_set(&self, idx: u32) -> bool {
67 let word = (idx as usize) / 64;
68 if word >= self.bits.len() {
69 return false;
70 }
71 let bit = (idx as usize) % 64;
72 (self.bits[word] >> bit) & 1 == 1
73 }
74
75 #[inline]
76 fn set(&mut self, idx: u32) {
77 self.ensure(idx);
78 let word = (idx as usize) / 64;
79 let bit = (idx as usize) % 64;
80 self.bits[word] |= 1u64 << bit;
81 }
82}
83
84#[derive(Debug, Clone)]
85struct BroadcastRecord {
86 peers: PeerMask,
87 last_sent: Instant,
88}
89
90impl Default for BroadcastRecord {
91 fn default() -> Self {
92 Self {
93 peers: PeerMask::default(),
94 last_sent: Instant::now(),
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
100pub struct TxBroadcaster {
101 peer_table: PeerTable,
102 blockchain: Arc<Blockchain>,
103 known_txs: HashMap<H256, BroadcastRecord>,
105 peer_indexer: HashMap<H256, u32>,
107 next_peer_idx: u32,
109 tx_broadcasting_time_interval: u64,
110}
111
112pub async fn send_tx_hashes(
113 txs: Vec<MempoolTransaction>,
114 capabilities: Vec<Capability>,
115 connection: &mut PeerConnection,
116 peer_id: H256,
117 blockchain: &Arc<Blockchain>,
118) -> Result<(), TxBroadcasterError> {
119 if SUPPORTED_ETH_CAPABILITIES
120 .iter()
121 .any(|cap| capabilities.contains(cap))
122 {
123 for tx_chunk in txs.chunks(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT) {
124 let tx_count = tx_chunk.len();
125 let mut txs_to_send = Vec::with_capacity(tx_count);
126 for tx in tx_chunk {
127 txs_to_send.push((**tx).clone());
128 }
129 let hashes_message = Message::NewPooledTransactionHashes(
130 NewPooledTransactionHashes::new(txs_to_send, blockchain)?,
131 );
132 connection.outgoing_message(hashes_message.clone()).await.unwrap_or_else(|err| {
133 debug!(peer_id = %format!("{:#x}", peer_id), err = ?err, "Failed to send transaction hashes");
134 });
135 }
136 }
137 Ok(())
138}
139
140#[actor(protocol = TxBroadcasterProtocol)]
141impl TxBroadcaster {
142 pub fn spawn(
143 kademlia: PeerTable,
144 blockchain: Arc<Blockchain>,
145 tx_broadcasting_time_interval: u64,
146 ) -> Result<ActorRef<TxBroadcaster>, TxBroadcasterError> {
147 debug!("Starting transaction broadcaster");
148
149 let state = TxBroadcaster {
150 peer_table: kademlia,
151 blockchain,
152 known_txs: HashMap::new(),
153 peer_indexer: HashMap::new(),
154 next_peer_idx: 0,
155 tx_broadcasting_time_interval,
156 };
157
158 Ok(state.start())
159 }
160
161 #[started]
162 async fn started(&mut self, ctx: &Context<Self>) {
163 send_interval(
164 Duration::from_millis(self.tx_broadcasting_time_interval),
165 ctx.clone(),
166 tx_broadcaster_protocol::BroadcastTxs,
167 );
168
169 send_interval(
170 Duration::from_secs(PRUNE_INTERVAL_SECS),
171 ctx.clone(),
172 tx_broadcaster_protocol::PruneTxs,
173 );
174 }
175
176 #[send_handler]
177 async fn handle_broadcast_txs(
178 &mut self,
179 _msg: tx_broadcaster_protocol::BroadcastTxs,
180 _ctx: &Context<Self>,
181 ) {
182 trace!(received = "BroadcastTxs");
183
184 let _ = self.do_broadcast_txs().await.inspect_err(|err| {
185 error!(err = ?err, "Failed to broadcast transactions");
186 });
187 }
188
189 #[send_handler]
190 async fn handle_add_txs(&mut self, msg: tx_broadcaster_protocol::AddTxs, _ctx: &Context<Self>) {
191 debug!(received = "AddTxs", tx_count = msg.tx_hashes.len());
192 self.do_add_txs(msg.tx_hashes, msg.peer_id);
193 }
194
195 #[send_handler]
196 async fn handle_prune_txs(
197 &mut self,
198 _msg: tx_broadcaster_protocol::PruneTxs,
199 _ctx: &Context<Self>,
200 ) {
201 debug!(received = "PruneTxs");
202 let now = Instant::now();
203 let before = self.known_txs.len();
204 let prune_window = Duration::from_secs(PRUNE_WAIT_TIME_SECS);
205
206 self.known_txs
207 .retain(|_, record| now.duration_since(record.last_sent) < prune_window);
208 debug!(
209 before = before,
210 after = self.known_txs.len(),
211 "Pruned old broadcasted transactions"
212 );
213
214 let _ = self.blockchain.mempool.prune_alternates(prune_window);
216 }
217
218 #[inline]
220 fn peer_index(&mut self, peer_id: H256) -> u32 {
221 if let Some(&idx) = self.peer_indexer.get(&peer_id) {
222 idx
223 } else {
224 let idx = self.next_peer_idx;
228 self.next_peer_idx += 1;
230 self.peer_indexer.insert(peer_id, idx);
231 idx
232 }
233 }
234
235 fn do_add_txs(&mut self, txs: Vec<H256>, peer_id: H256) {
236 debug!(total = self.known_txs.len(), adding = txs.len(), peer_id = %format!("{:#x}", peer_id), "Adding transactions to known list");
237
238 if txs.is_empty() {
239 return;
240 }
241
242 let now = Instant::now();
243 let peer_idx = self.peer_index(peer_id);
244 for tx in txs {
245 let record = self.known_txs.entry(tx).or_default();
246 record.peers.set(peer_idx);
247 record.last_sent = now;
248 }
249 }
250
251 async fn do_broadcast_txs(&mut self) -> Result<(), TxBroadcasterError> {
252 let txs_to_broadcast = self
253 .blockchain
254 .mempool
255 .get_txs_for_broadcast()
256 .map_err(|_| TxBroadcasterError::Broadcast)?;
257 if txs_to_broadcast.is_empty() {
258 return Ok(());
259 }
260 let peers = self.peer_table.get_peers_with_capabilities().await?;
261 let peer_sqrt = (peers.len() as f64).sqrt();
262
263 let full_txs = txs_to_broadcast
264 .iter()
265 .map(|tx| tx.transaction().clone())
266 .filter(|tx| {
267 !matches!(tx, Transaction::EIP4844Transaction { .. }) && !tx.is_privileged()
268 })
269 .collect::<Vec<Transaction>>();
270
271 let blob_txs = txs_to_broadcast
272 .iter()
273 .filter(|tx| matches!(tx.transaction(), Transaction::EIP4844Transaction { .. }))
274 .cloned()
275 .collect::<Vec<MempoolTransaction>>();
276
277 let mut shuffled_peers = peers.clone();
278 shuffled_peers.shuffle(&mut thread_rng());
279
280 let (peers_to_send_full_txs, peers_to_send_hashes) =
281 shuffled_peers.split_at(peer_sqrt.ceil() as usize);
282
283 for (peer_id, mut connection, capabilities) in peers_to_send_full_txs.iter().cloned() {
284 let peer_idx = self.peer_index(peer_id);
285 let txs_to_send = full_txs
286 .iter()
287 .filter(|tx| {
288 let hash = tx.hash();
289 !self
290 .known_txs
291 .get(&hash)
292 .is_some_and(|record| record.peers.is_set(peer_idx))
293 })
294 .cloned()
295 .collect::<Vec<Transaction>>();
296 self.do_add_txs(txs_to_send.iter().map(|tx| tx.hash()).collect(), peer_id);
297 let txs_message = Message::Transactions(Transactions {
299 transactions: txs_to_send,
300 });
301 connection.outgoing_message(txs_message).await.unwrap_or_else(|err| {
302 debug!(peer_id = %format!("{:#x}", peer_id), err = ?err, "Failed to send transactions");
303 });
304 self.send_tx_hashes_internal(blob_txs.clone(), capabilities, &mut connection, peer_id)
305 .await?;
306 }
307 for (peer_id, mut connection, capabilities) in peers_to_send_hashes.iter().cloned() {
308 self.send_tx_hashes_internal(
310 txs_to_broadcast.clone(),
311 capabilities,
312 &mut connection,
313 peer_id,
314 )
315 .await?;
316 }
317 let broadcasted_hashes: Vec<H256> = txs_to_broadcast.iter().map(|tx| tx.hash()).collect();
318 self.blockchain
319 .mempool
320 .remove_broadcasted_txs(&broadcasted_hashes)?;
321 Ok(())
322 }
323
324 async fn send_tx_hashes_internal(
325 &mut self,
326 txs: Vec<MempoolTransaction>,
327 capabilities: Vec<Capability>,
328 connection: &mut PeerConnection,
329 peer_id: H256,
330 ) -> Result<(), TxBroadcasterError> {
331 let peer_idx = self.peer_index(peer_id);
332 let txs_to_send = txs
333 .iter()
334 .filter(|tx| {
335 let hash = tx.hash();
336 !self
337 .known_txs
338 .get(&hash)
339 .is_some_and(|record| record.peers.is_set(peer_idx))
340 && !tx.is_privileged()
341 })
342 .cloned()
343 .collect::<Vec<MempoolTransaction>>();
344 self.do_add_txs(txs_to_send.iter().map(|tx| tx.hash()).collect(), peer_id);
345 send_tx_hashes(
346 txs_to_send,
347 capabilities,
348 connection,
349 peer_id,
350 &self.blockchain,
351 )
352 .await
353 }
354}
355
356#[derive(Debug, thiserror::Error)]
357pub enum TxBroadcasterError {
358 #[error("Failed to broadcast transactions")]
359 Broadcast,
360 #[error(transparent)]
361 StoreError(#[from] StoreError),
362 #[error(transparent)]
363 PeerTableError(#[from] ActorError),
364}