1use crate::transactions::config::*;
18
19use codec::{Decode, Encode};
20use futures::{prelude::*, stream::FuturesUnordered};
21use log::{debug, trace, warn};
22
23use soil_prometheus::{register, Counter, PrometheusError, Registry, U64};
24use soil_client::utils::mpsc::{
25 tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
26};
27use soil_network::common::{role::ObservedRole, ExHashT};
28use soil_network::sync::{SyncEvent, SyncEventStream};
29use soil_network::types::PeerId;
30use soil_network::{
31 config::{NonReservedPeerMode, ProtocolId, SetConfig},
32 error, multiaddr,
33 peer_store::PeerStoreProvider,
34 service::{
35 traits::{NotificationEvent, NotificationService, ValidationResult},
36 NotificationMetrics,
37 },
38 types::ProtocolName,
39 utils::{interval, LruHashSet},
40 NetworkBackend, NetworkEventStream, NetworkPeers,
41};
42use subsoil::runtime::traits::Block as BlockT;
43
44use std::{
45 collections::{hash_map::Entry, HashMap},
46 iter,
47 num::NonZeroUsize,
48 pin::Pin,
49 sync::Arc,
50 task::Poll,
51};
52
53pub mod config;
54
55pub type Transactions<E> = Vec<E>;
57
58const LOG_TARGET: &str = "sync";
60
61mod rep {
62 use soil_network::ReputationChange as Rep;
63 pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction");
68 pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)");
70 pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
72 pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
74}
75
76struct Metrics {
77 propagated_transactions: Counter<U64>,
78}
79
80impl Metrics {
81 fn register(r: &Registry) -> Result<Self, PrometheusError> {
82 Ok(Self {
83 propagated_transactions: register(
84 Counter::new(
85 "substrate_sync_propagated_transactions",
86 "Number of transactions propagated to at least one peer",
87 )?,
88 r,
89 )?,
90 })
91 }
92}
93
94struct PendingTransaction<H> {
95 validation: TransactionImportFuture,
96 tx_hash: H,
97}
98
99impl<H> Unpin for PendingTransaction<H> {}
100
101impl<H: ExHashT> Future for PendingTransaction<H> {
102 type Output = (H, TransactionImport);
103
104 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
105 if let Poll::Ready(import_result) = self.validation.poll_unpin(cx) {
106 return Poll::Ready((self.tx_hash.clone(), import_result));
107 }
108
109 Poll::Pending
110 }
111}
112
113pub struct TransactionsHandlerPrototype {
115 protocol_name: ProtocolName,
117
118 notification_service: Box<dyn NotificationService>,
120}
121
122impl TransactionsHandlerPrototype {
123 pub fn new<
125 Hash: AsRef<[u8]>,
126 Block: BlockT,
127 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
128 >(
129 protocol_id: ProtocolId,
130 genesis_hash: Hash,
131 fork_id: Option<&str>,
132 metrics: NotificationMetrics,
133 peer_store_handle: Arc<dyn PeerStoreProvider>,
134 ) -> (Self, Net::NotificationProtocolConfig) {
135 let genesis_hash = genesis_hash.as_ref();
136 let protocol_name: ProtocolName = if let Some(fork_id) = fork_id {
137 format!("/{}/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
138 } else {
139 format!("/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash))
140 }
141 .into();
142 let (config, notification_service) = Net::notification_config(
143 protocol_name.clone(),
144 vec![format!("/{}/transactions/1", protocol_id.as_ref()).into()],
145 MAX_TRANSACTIONS_SIZE,
146 None,
147 SetConfig {
148 in_peers: 0,
149 out_peers: 0,
150 reserved_nodes: Vec::new(),
151 non_reserved_mode: NonReservedPeerMode::Deny,
152 },
153 metrics,
154 peer_store_handle,
155 );
156
157 (Self { protocol_name, notification_service }, config)
158 }
159
160 pub fn build<
166 B: BlockT + 'static,
167 H: ExHashT,
168 N: NetworkPeers + NetworkEventStream,
169 S: SyncEventStream + soil_client::consensus::SyncOracle,
170 >(
171 self,
172 network: N,
173 sync: S,
174 transaction_pool: Arc<dyn TransactionPool<H, B>>,
175 metrics_registry: Option<&Registry>,
176 ) -> error::Result<(TransactionsHandler<B, H, N, S>, TransactionsHandlerController<H>)> {
177 let sync_event_stream = sync.event_stream("transactions-handler-sync");
178 let (to_handler, from_controller) = tracing_unbounded("mpsc_transactions_handler", 100_000);
179
180 let handler = TransactionsHandler {
181 protocol_name: self.protocol_name,
182 notification_service: self.notification_service,
183 propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
184 as Pin<Box<dyn Stream<Item = ()> + Send>>)
185 .fuse(),
186 pending_transactions: FuturesUnordered::new(),
187 pending_transactions_peers: HashMap::new(),
188 network,
189 sync,
190 sync_event_stream: sync_event_stream.fuse(),
191 peers: HashMap::new(),
192 transaction_pool,
193 from_controller,
194 metrics: if let Some(r) = metrics_registry {
195 Some(Metrics::register(r)?)
196 } else {
197 None
198 },
199 };
200
201 let controller = TransactionsHandlerController { to_handler };
202
203 Ok((handler, controller))
204 }
205}
206
207pub struct TransactionsHandlerController<H: ExHashT> {
209 to_handler: TracingUnboundedSender<ToHandler<H>>,
210}
211
212impl<H: ExHashT> TransactionsHandlerController<H> {
213 pub fn propagate_transactions(&self) {
218 let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions);
219 }
220
221 pub fn propagate_transaction(&self, hash: H) {
226 let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash));
227 }
228}
229
230enum ToHandler<H: ExHashT> {
231 PropagateTransactions,
232 PropagateTransaction(H),
233}
234
235pub struct TransactionsHandler<
237 B: BlockT + 'static,
238 H: ExHashT,
239 N: NetworkPeers + NetworkEventStream,
240 S: SyncEventStream + soil_client::consensus::SyncOracle,
241> {
242 protocol_name: ProtocolName,
243 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
245 pending_transactions: FuturesUnordered<PendingTransaction<H>>,
247 pending_transactions_peers: HashMap<H, Vec<PeerId>>,
252 network: N,
254 sync: S,
256 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
258 peers: HashMap<PeerId, Peer<H>>,
260 transaction_pool: Arc<dyn TransactionPool<H, B>>,
261 from_controller: TracingUnboundedReceiver<ToHandler<H>>,
262 metrics: Option<Metrics>,
264 notification_service: Box<dyn NotificationService>,
266}
267
268#[derive(Debug)]
270struct Peer<H: ExHashT> {
271 known_transactions: LruHashSet<H>,
273 role: ObservedRole,
274}
275
276impl<B, H, N, S> TransactionsHandler<B, H, N, S>
277where
278 B: BlockT + 'static,
279 H: ExHashT,
280 N: NetworkPeers + NetworkEventStream,
281 S: SyncEventStream + soil_client::consensus::SyncOracle,
282{
283 pub async fn run(mut self) {
286 loop {
287 futures::select! {
288 _ = self.propagate_timeout.next() => {
289 self.propagate_transactions();
290 },
291 (tx_hash, result) = self.pending_transactions.select_next_some() => {
292 if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
293 peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
294 } else {
295 warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
296 }
297 },
298 sync_event = self.sync_event_stream.next() => {
299 if let Some(sync_event) = sync_event {
300 self.handle_sync_event(sync_event);
301 } else {
302 return;
304 }
305 }
306 message = self.from_controller.select_next_some() => {
307 match message {
308 ToHandler::PropagateTransaction(hash) => self.propagate_transaction(&hash),
309 ToHandler::PropagateTransactions => self.propagate_transactions(),
310 }
311 },
312 event = self.notification_service.next_event().fuse() => {
313 if let Some(event) = event {
314 self.handle_notification_event(event)
315 } else {
316 return
318 }
319 }
320 }
321 }
322 }
323
324 fn handle_notification_event(&mut self, event: NotificationEvent) {
325 match event {
326 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
327 let result = self
329 .network
330 .peer_role(peer, handshake)
331 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
332 let _ = result_tx.send(result);
333 },
334 NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
335 let Some(role) = self.network.peer_role(peer, handshake) else {
336 log::debug!(target: "sub-libp2p", "role for {peer} couldn't be determined");
337 return;
338 };
339
340 let _was_in = self.peers.insert(
341 peer,
342 Peer {
343 known_transactions: LruHashSet::new(
344 NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS).expect("Constant is nonzero"),
345 ),
346 role,
347 },
348 );
349 debug_assert!(_was_in.is_none());
350 },
351 NotificationEvent::NotificationStreamClosed { peer } => {
352 let _peer = self.peers.remove(&peer);
353 debug_assert!(_peer.is_some());
354 },
355 NotificationEvent::NotificationReceived { peer, notification } => {
356 if let Ok(m) =
357 <Transactions<B::Extrinsic> as Decode>::decode(&mut notification.as_ref())
358 {
359 self.on_transactions(peer, m);
360 } else {
361 warn!(target: "sub-libp2p", "Failed to decode transactions list from peer {peer}");
362 self.network.report_peer(peer, rep::BAD_TRANSACTION);
363 }
364 },
365 }
366 }
367
368 fn handle_sync_event(&mut self, event: SyncEvent) {
369 match event {
370 SyncEvent::PeerConnected(remote) => {
371 let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
372 .collect::<multiaddr::Multiaddr>();
373 let result = self.network.add_peers_to_reserved_set(
374 self.protocol_name.clone(),
375 iter::once(addr).collect(),
376 );
377 if let Err(err) = result {
378 log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
379 }
380 },
381 SyncEvent::PeerDisconnected(remote) => {
382 let result = self.network.remove_peers_from_reserved_set(
383 self.protocol_name.clone(),
384 iter::once(remote).collect(),
385 );
386 if let Err(err) = result {
387 log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);
388 }
389 },
390 }
391 }
392
393 fn on_transactions(&mut self, who: PeerId, transactions: Transactions<B::Extrinsic>) {
395 if self.sync.is_major_syncing() {
397 trace!(target: LOG_TARGET, "{} Ignoring transactions while major syncing", who);
398 return;
399 }
400
401 trace!(target: LOG_TARGET, "Received {} transactions from {}", transactions.len(), who);
402 if let Some(ref mut peer) = self.peers.get_mut(&who) {
403 for t in transactions {
404 if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
405 debug!(
406 target: LOG_TARGET,
407 "Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",
408 MAX_PENDING_TRANSACTIONS,
409 );
410 break;
411 }
412
413 let hash = self.transaction_pool.hash_of(&t);
414 peer.known_transactions.insert(hash.clone());
415
416 self.network.report_peer(who, rep::ANY_TRANSACTION);
417
418 match self.pending_transactions_peers.entry(hash.clone()) {
419 Entry::Vacant(entry) => {
420 self.pending_transactions.push(PendingTransaction {
421 validation: self.transaction_pool.import(t),
422 tx_hash: hash,
423 });
424 entry.insert(vec![who]);
425 },
426 Entry::Occupied(mut entry) => {
427 entry.get_mut().push(who);
428 },
429 }
430 }
431 }
432 }
433
434 fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) {
435 match import {
436 TransactionImport::KnownGood => {
437 self.network.report_peer(who, rep::ANY_TRANSACTION_REFUND)
438 },
439 TransactionImport::NewGood => self.network.report_peer(who, rep::GOOD_TRANSACTION),
440 TransactionImport::Bad => self.network.report_peer(who, rep::BAD_TRANSACTION),
441 TransactionImport::None => {},
442 }
443 }
444
445 pub fn propagate_transaction(&mut self, hash: &H) {
447 if self.sync.is_major_syncing() {
449 return;
450 }
451
452 debug!(target: LOG_TARGET, "Propagating transaction [{:?}]", hash);
453 if let Some(transaction) = self.transaction_pool.transaction(hash) {
454 let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]);
455 self.transaction_pool.on_broadcasted(propagated_to);
456 } else {
457 debug!(target: "sync", "Propagating transaction failure [{:?}]", hash);
458 }
459 }
460
461 fn do_propagate_transactions(
462 &mut self,
463 transactions: &[(H, Arc<B::Extrinsic>)],
464 ) -> HashMap<H, Vec<String>> {
465 let mut propagated_to = HashMap::<_, Vec<_>>::new();
466 let mut propagated_transactions = 0;
467
468 for (who, peer) in self.peers.iter_mut() {
469 if matches!(peer.role, ObservedRole::Light) {
471 continue;
472 }
473
474 let (hashes, to_send): (Vec<_>, Transactions<_>) = transactions
475 .iter()
476 .filter(|(hash, _)| peer.known_transactions.insert(hash.clone()))
477 .cloned()
478 .unzip();
479
480 propagated_transactions += hashes.len();
481
482 if !to_send.is_empty() {
483 for hash in hashes {
484 propagated_to.entry(hash).or_default().push(who.to_base58());
485 }
486 trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
487 for to_send in to_send {
497 let _ = self
498 .notification_service
499 .send_sync_notification(who, vec![to_send].encode());
500 }
501 }
502 }
503
504 if let Some(ref metrics) = self.metrics {
505 metrics.propagated_transactions.inc_by(propagated_transactions as _)
506 }
507
508 propagated_to
509 }
510
511 fn propagate_transactions(&mut self) {
513 if self.sync.is_major_syncing() {
515 return;
516 }
517
518 let transactions = self.transaction_pool.transactions();
519
520 if transactions.is_empty() {
521 return;
522 }
523
524 debug!(target: LOG_TARGET, "Propagating transactions");
525
526 let propagated_to = self.do_propagate_transactions(&transactions);
527 self.transaction_pool.on_broadcasted(propagated_to);
528 }
529}